From 99eeb3c063234326132cf8e786ed5b857be77e94 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Wed, 18 May 2022 01:27:06 -0700 Subject: [PATCH] feat(cli): added CLI for controlling throttler (#1956) Supported are: ``` $ kopia throttle set \ --download-bytes-per-second=N | unlimited --upload-bytes-per-second=N | unlimited --read-requests-per-second=N | unlimited --write-requests-per-second=N | unlimited --list-requests-per-second=N | unlimited --concurrent-reads=N | unlimited --concurrent-writes=N | unlimited ``` To change parameters of a running server use: ``` $ kopia server throttle set \ --address= \ --server-control-password= \ --download-bytes-per-second=N | unlimited --upload-bytes-per-second=N | unlimited --read-requests-per-second=N | unlimited --write-requests-per-second=N | unlimited --list-requests-per-second=N | unlimited --concurrent-reads=N | unlimited --concurrent-writes=N | unlimited ``` --- cli/command_repo_throttle_test.go | 81 +++++++++++ cli/command_repository.go | 2 + cli/command_repository_throttle.go | 13 ++ cli/command_repository_throttle_get.go | 28 ++++ cli/command_repository_throttle_set.go | 38 +++++ cli/command_server.go | 2 + cli/command_server_control_test.go | 53 +++++++ cli/command_server_throttle.go | 12 ++ cli/command_server_throttle_get.go | 33 +++++ cli/command_server_throttle_set.go | 50 +++++++ cli/throttle_get.go | 49 +++++++ cli/throttle_set.go | 130 ++++++++++++++++++ internal/server/server.go | 2 + internal/testutil/testutil.go | 8 +- repo/blob/throttling/throttler.go | 67 ++++++++- repo/blob/throttling/throttling_semaphore.go | 63 +++++++++ .../throttling/throttling_semaphore_test.go | 64 +++++++++ repo/blob/throttling/throttling_storage.go | 10 ++ .../throttling/throttling_storage_test.go | 13 ++ repo/local_config.go | 3 + repo/open.go | 35 +++-- repo/repository.go | 3 +- 22 files changed, 737 insertions(+), 22 deletions(-) create mode 100644 cli/command_repo_throttle_test.go create mode 100644 cli/command_repository_throttle.go create mode 100644 cli/command_repository_throttle_get.go create mode 100644 cli/command_repository_throttle_set.go create mode 100644 cli/command_server_throttle.go create mode 100644 cli/command_server_throttle_get.go create mode 100644 cli/command_server_throttle_set.go create mode 100644 cli/throttle_get.go create mode 100644 cli/throttle_set.go create mode 100644 repo/blob/throttling/throttling_semaphore.go create mode 100644 repo/blob/throttling/throttling_semaphore_test.go diff --git a/cli/command_repo_throttle_test.go b/cli/command_repo_throttle_test.go new file mode 100644 index 000000000..e8b1a7bd7 --- /dev/null +++ b/cli/command_repo_throttle_test.go @@ -0,0 +1,81 @@ +package cli_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob/throttling" + "github.com/kopia/kopia/tests/testenv" +) + +func TestRepoThrottle(t *testing.T) { + t.Parallel() + + env := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, testenv.NewInProcRunner(t)) + + env.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", env.RepoDir) + defer env.RunAndExpectSuccess(t, "repo", "disconnect") + + require.Equal(t, []string{ + "Max Download Speed: (unlimited)", + "Max Upload Speed: (unlimited)", + "Max Read Requests Per Second: (unlimited)", + "Max Write Requests Per Second: (unlimited)", + "Max List Requests Per Second: (unlimited)", + "Max Concurrent Reads: (unlimited)", + "Max Concurrent Writes: (unlimited)", + }, env.RunAndExpectSuccess(t, "repo", "throttle", "get")) + + env.RunAndExpectSuccess(t, "repo", "throttle", "set", + "--download-bytes-per-second=1000000000", + "--upload-bytes-per-second=2000000000", + "--read-requests-per-second=300", + "--write-requests-per-second=400", + "--list-requests-per-second=500", + "--concurrent-reads=300", + "--concurrent-writes=400", + ) + + env.RunAndExpectFailure(t, "repo", "throttle", "set", "--download-bytes-per-second=-30") + env.RunAndExpectFailure(t, "repo", "throttle", "set", "--concurrent-reads=-3") + + require.Equal(t, []string{ + "Max Download Speed: 1 GB/s", + "Max Upload Speed: 2 GB/s", + "Max Read Requests Per Second: 300", + "Max Write Requests Per Second: 400", + "Max List Requests Per Second: 500", + "Max Concurrent Reads: 300", + "Max Concurrent Writes: 400", + }, env.RunAndExpectSuccess(t, "repo", "throttle", "get")) + + env.RunAndExpectSuccess(t, "repo", "throttle", "set", + "--upload-bytes-per-second=unlimited", + "--write-requests-per-second=unlimited", + ) + + require.Equal(t, []string{ + "Max Download Speed: 1 GB/s", + "Max Upload Speed: (unlimited)", + "Max Read Requests Per Second: 300", + "Max Write Requests Per Second: (unlimited)", + "Max List Requests Per Second: 500", + "Max Concurrent Reads: 300", + "Max Concurrent Writes: 400", + }, env.RunAndExpectSuccess(t, "repo", "throttle", "get")) + + var limits throttling.Limits + + testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "repo", "throttle", "get", "--json"), &limits) + require.Equal(t, throttling.Limits{ + ReadsPerSecond: 300, + WritesPerSecond: 0, + ListsPerSecond: 500, + UploadBytesPerSecond: 0, + DownloadBytesPerSecond: 1e+09, + ConcurrentReads: 300, + ConcurrentWrites: 400, + }, limits) +} diff --git a/cli/command_repository.go b/cli/command_repository.go index bb94d0e0a..839565804 100644 --- a/cli/command_repository.go +++ b/cli/command_repository.go @@ -10,6 +10,7 @@ type commandRepository struct { changePassword commandRepositoryChangePassword status commandRepositoryStatus syncTo commandRepositorySyncTo + throttle commandRepositoryThrottle validateProvider commandRepositoryValidateProvider } @@ -24,6 +25,7 @@ func (c *commandRepository) setup(svc advancedAppServices, parent commandParent) c.setParameters.setup(svc, cmd) c.status.setup(svc, cmd) c.syncTo.setup(svc, cmd) // nolint:contextcheck + c.throttle.setup(svc, cmd) c.changePassword.setup(svc, cmd) c.validateProvider.setup(svc, cmd) } diff --git a/cli/command_repository_throttle.go b/cli/command_repository_throttle.go new file mode 100644 index 000000000..52347840d --- /dev/null +++ b/cli/command_repository_throttle.go @@ -0,0 +1,13 @@ +package cli + +type commandRepositoryThrottle struct { + get commandRepositoryThrottleGet + set commandRepositoryThrottleSet +} + +func (c *commandRepositoryThrottle) setup(svc appServices, parent commandParent) { + cmd := parent.Command("throttle", "Commands to manipulate throttle configuration") + + c.get.setup(svc, cmd) + c.set.setup(svc, cmd) +} diff --git a/cli/command_repository_throttle_get.go b/cli/command_repository_throttle_get.go new file mode 100644 index 000000000..3d825a40b --- /dev/null +++ b/cli/command_repository_throttle_get.go @@ -0,0 +1,28 @@ +package cli + +import ( + "context" + + "github.com/kopia/kopia/repo" +) + +type commandRepositoryThrottleGet struct { + ctg commonThrottleGet +} + +func (c *commandRepositoryThrottleGet) setup(svc appServices, parent commandParent) { + cmd := parent.Command("get", "Get throttling parameters for a repository") + c.ctg.setup(svc, cmd) + + cmd.Action(svc.directRepositoryReadAction(c.run)) +} + +func (c *commandRepositoryThrottleGet) run(ctx context.Context, rep repo.DirectRepository) error { + limits := rep.Throttler().Limits() + + if err := c.ctg.output(&limits); err != nil { + return err + } + + return nil +} diff --git a/cli/command_repository_throttle_set.go b/cli/command_repository_throttle_set.go new file mode 100644 index 000000000..ffc1617a8 --- /dev/null +++ b/cli/command_repository_throttle_set.go @@ -0,0 +1,38 @@ +package cli + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo" +) + +type commandRepositoryThrottleSet struct { + cts commonThrottleSet +} + +func (c *commandRepositoryThrottleSet) setup(svc appServices, parent commandParent) { + cmd := parent.Command("set", "Set throttling parameters for a repository") + c.cts.setup(cmd) + + cmd.Action(svc.directRepositoryWriteAction(c.run)) +} + +func (c *commandRepositoryThrottleSet) run(ctx context.Context, rep repo.DirectRepositoryWriter) error { + thr := rep.Throttler() + limits := thr.Limits() + + var changeCount int + + if err := c.cts.apply(ctx, &limits, &changeCount); err != nil { + return err + } + + if changeCount == 0 { + log(ctx).Infof("No changes made.") + return nil + } + + return errors.Wrap(rep.Throttler().SetLimits(limits), "error setting limits") +} diff --git a/cli/command_server.go b/cli/command_server.go index d46fc73ee..191e680e1 100644 --- a/cli/command_server.go +++ b/cli/command_server.go @@ -17,6 +17,7 @@ type commandServer struct { resume commandServerResume start commandServerStart status commandServerStatus + throttle commandServerThrottle upload commandServerUpload shutdown commandServerShutdown } @@ -70,6 +71,7 @@ func (c *commandServer) setup(svc advancedAppServices, parent commandParent) { c.cancel.setup(svc, cmd) c.pause.setup(svc, cmd) c.resume.setup(svc, cmd) + c.throttle.setup(svc, cmd) } func (c *serverClientFlags) serverAPIClientOptions() (apiclient.Options, error) { diff --git a/cli/command_server_control_test.go b/cli/command_server_control_test.go index 7dc718ed8..161d9fe20 100644 --- a/cli/command_server_control_test.go +++ b/cli/command_server_control_test.go @@ -7,6 +7,7 @@ "github.com/stretchr/testify/require" "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob/throttling" "github.com/kopia/kopia/tests/testenv" ) @@ -92,6 +93,58 @@ func TestServerControl(t *testing.T) { env.RunAndExpectSuccess(t, "server", "pause", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, dir1) env.RunAndExpectSuccess(t, "server", "resume", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, dir1) + env.RunAndExpectSuccess(t, "server", "throttle", "set", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, + "--download-bytes-per-second=1000000000", + "--upload-bytes-per-second=2000000000", + "--read-requests-per-second=300", + "--write-requests-per-second=400", + "--list-requests-per-second=500", + "--concurrent-reads=300", + "--concurrent-writes=400", + ) + + require.Equal(t, []string{ + "Max Download Speed: 1 GB/s", + "Max Upload Speed: 2 GB/s", + "Max Read Requests Per Second: 300", + "Max Write Requests Per Second: 400", + "Max List Requests Per Second: 500", + "Max Concurrent Reads: 300", + "Max Concurrent Writes: 400", + }, env.RunAndExpectSuccess(t, "server", "throttle", "get", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword)) + + env.RunAndExpectSuccess(t, "server", "throttle", "set", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, + "--upload-bytes-per-second=unlimited", + "--write-requests-per-second=unlimited", + ) + + env.RunAndExpectFailure(t, "server", "throttle", "set", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, + "--upload-bytes-per-second=-10", + ) + + require.Equal(t, []string{ + "Max Download Speed: 1 GB/s", + "Max Upload Speed: (unlimited)", + "Max Read Requests Per Second: 300", + "Max Write Requests Per Second: (unlimited)", + "Max List Requests Per Second: 500", + "Max Concurrent Reads: 300", + "Max Concurrent Writes: 400", + }, env.RunAndExpectSuccess(t, "server", "throttle", "get", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword)) + + var limits throttling.Limits + + testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "server", "throttle", "get", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, "--json"), &limits) + require.Equal(t, throttling.Limits{ + ReadsPerSecond: 300, + WritesPerSecond: 0, + ListsPerSecond: 500, + UploadBytesPerSecond: 0, + DownloadBytesPerSecond: 1e+09, + ConcurrentReads: 300, + ConcurrentWrites: 400, + }, limits) + env.RunAndExpectSuccess(t, "server", "shutdown", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword) select { diff --git a/cli/command_server_throttle.go b/cli/command_server_throttle.go new file mode 100644 index 000000000..30dc7a02e --- /dev/null +++ b/cli/command_server_throttle.go @@ -0,0 +1,12 @@ +package cli + +type commandServerThrottle struct { + get commandServerThrottleGet + set commandServerThrottleSet +} + +func (c *commandServerThrottle) setup(svc appServices, parent commandParent) { + cmd := parent.Command("throttle", "Control throttling parameters for a running server") + c.get.setup(svc, cmd) + c.set.setup(svc, cmd) +} diff --git a/cli/command_server_throttle_get.go b/cli/command_server_throttle_get.go new file mode 100644 index 000000000..151492a31 --- /dev/null +++ b/cli/command_server_throttle_get.go @@ -0,0 +1,33 @@ +package cli + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/apiclient" + "github.com/kopia/kopia/repo/blob/throttling" +) + +type commandServerThrottleGet struct { + sf serverClientFlags + + ctg commonThrottleGet +} + +func (c *commandServerThrottleGet) setup(svc appServices, parent commandParent) { + cmd := parent.Command("get", "Get throttling parameters for a running server") + c.sf.setup(cmd) + c.ctg.setup(svc, cmd) + cmd.Action(svc.serverAction(&c.sf, c.run)) +} + +func (c *commandServerThrottleGet) run(ctx context.Context, cli *apiclient.KopiaAPIClient) error { + var limits throttling.Limits + + if err := cli.Get(ctx, "control/throttle", nil, &limits); err != nil { + return errors.Wrap(err, "unable to get current throttle") + } + + return c.ctg.output(&limits) +} diff --git a/cli/command_server_throttle_set.go b/cli/command_server_throttle_set.go new file mode 100644 index 000000000..ee018c151 --- /dev/null +++ b/cli/command_server_throttle_set.go @@ -0,0 +1,50 @@ +package cli + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/apiclient" + "github.com/kopia/kopia/internal/serverapi" + "github.com/kopia/kopia/repo/blob/throttling" +) + +type commandServerThrottleSet struct { + sf serverClientFlags + + cts commonThrottleSet +} + +func (c *commandServerThrottleSet) setup(svc appServices, parent commandParent) { + cmd := parent.Command("set", "Set throttling parameters for a running server") + c.sf.setup(cmd) + c.cts.setup(cmd) + + cmd.Action(svc.serverAction(&c.sf, c.run)) +} + +func (c *commandServerThrottleSet) run(ctx context.Context, cli *apiclient.KopiaAPIClient) error { + var limits throttling.Limits + + if err := cli.Get(ctx, "control/throttle", nil, &limits); err != nil { + return errors.Wrap(err, "unable to get current throttle") + } + + var changeCount int + + if err := c.cts.apply(ctx, &limits, &changeCount); err != nil { + return err + } + + if changeCount == 0 { + log(ctx).Infof("No changes made.") + return nil + } + + if err := cli.Put(ctx, "control/throttle", &limits, &serverapi.Empty{}); err != nil { + return errors.Wrap(err, "unable to change throttle") + } + + return nil +} diff --git a/cli/throttle_get.go b/cli/throttle_get.go new file mode 100644 index 000000000..39d5d453c --- /dev/null +++ b/cli/throttle_get.go @@ -0,0 +1,49 @@ +package cli + +import ( + "strconv" + + "github.com/alecthomas/kingpin" + + "github.com/kopia/kopia/internal/units" + "github.com/kopia/kopia/repo/blob/throttling" +) + +type commonThrottleGet struct { + out textOutput + jo jsonOutput +} + +func (c *commonThrottleGet) setup(svc appServices, cmd *kingpin.CmdClause) { + c.out.setup(svc) + c.jo.setup(svc, cmd) +} + +func (c *commonThrottleGet) output(limits *throttling.Limits) error { + if c.jo.jsonOutput { + c.out.printStdout("%s\n", c.jo.jsonBytes(limits)) + return nil + } + + c.printValueOrUnlimited("Max Download Speed:", limits.DownloadBytesPerSecond, units.BytesPerSecondsString) + c.printValueOrUnlimited("Max Upload Speed:", limits.UploadBytesPerSecond, units.BytesPerSecondsString) + c.printValueOrUnlimited("Max Read Requests Per Second:", limits.ReadsPerSecond, c.floatToString) + c.printValueOrUnlimited("Max Write Requests Per Second:", limits.WritesPerSecond, c.floatToString) + c.printValueOrUnlimited("Max List Requests Per Second:", limits.ListsPerSecond, c.floatToString) + c.printValueOrUnlimited("Max Concurrent Reads:", float64(limits.ConcurrentReads), c.floatToString) + c.printValueOrUnlimited("Max Concurrent Writes:", float64(limits.ConcurrentWrites), c.floatToString) + + return nil +} + +func (c *commonThrottleGet) printValueOrUnlimited(label string, v float64, convert func(v float64) string) { + if v != 0 { + c.out.printStdout("%-30v %v\n", label, convert(v)) + } else { + c.out.printStdout("%-30v (unlimited)\n", label) + } +} + +func (c *commonThrottleGet) floatToString(v float64) string { + return strconv.FormatFloat(v, 'f', 0, 64) // nolint:gomnd +} diff --git a/cli/throttle_set.go b/cli/throttle_set.go new file mode 100644 index 000000000..e148b0551 --- /dev/null +++ b/cli/throttle_set.go @@ -0,0 +1,130 @@ +package cli + +import ( + "context" + "strconv" + + "github.com/alecthomas/kingpin" + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/units" + "github.com/kopia/kopia/repo/blob/throttling" +) + +type commonThrottleSet struct { + setDownloadBytesPerSecond string + setUploadBytesPerSecond string + setReadsPerSecond string + setWritesPerSecond string + setListsPerSecond string + setConcurrentReads string + setConcurrentWrites string +} + +func (c *commonThrottleSet) setup(cmd *kingpin.CmdClause) { + cmd.Flag("download-bytes-per-second", "Set the download bytes per second").StringVar(&c.setDownloadBytesPerSecond) + cmd.Flag("upload-bytes-per-second", "Set the upload bytes per second").StringVar(&c.setUploadBytesPerSecond) + cmd.Flag("read-requests-per-second", "Set max reads per second").StringVar(&c.setReadsPerSecond) + cmd.Flag("write-requests-per-second", "Set max writes per second").StringVar(&c.setWritesPerSecond) + cmd.Flag("list-requests-per-second", "Set max lists per second").StringVar(&c.setListsPerSecond) + cmd.Flag("concurrent-reads", "Set max concurrent reads").StringVar(&c.setConcurrentReads) + cmd.Flag("concurrent-writes", "Set max concurrent writes").StringVar(&c.setConcurrentWrites) +} + +func (c *commonThrottleSet) apply(ctx context.Context, limits *throttling.Limits, changeCount *int) error { + if err := c.setThrottleFloat64(ctx, "max download speed", true, &limits.DownloadBytesPerSecond, c.setDownloadBytesPerSecond, changeCount); err != nil { + return err + } + + if err := c.setThrottleFloat64(ctx, "max upload speed", true, &limits.UploadBytesPerSecond, c.setUploadBytesPerSecond, changeCount); err != nil { + return err + } + + if err := c.setThrottleFloat64(ctx, "reads per second", false, &limits.ReadsPerSecond, c.setReadsPerSecond, changeCount); err != nil { + return err + } + + if err := c.setThrottleFloat64(ctx, "writes per second", false, &limits.WritesPerSecond, c.setWritesPerSecond, changeCount); err != nil { + return err + } + + if err := c.setThrottleFloat64(ctx, "lists per second", false, &limits.ListsPerSecond, c.setListsPerSecond, changeCount); err != nil { + return err + } + + if err := c.setThrottleInt(ctx, "concurrent reads", &limits.ConcurrentReads, c.setConcurrentReads, changeCount); err != nil { + return err + } + + if err := c.setThrottleInt(ctx, "concurrent writes", &limits.ConcurrentWrites, c.setConcurrentWrites, changeCount); err != nil { + return err + } + + return nil +} + +func (c *commonThrottleSet) setThrottleFloat64(ctx context.Context, desc string, bps bool, val *float64, str string, changeCount *int) error { + if str == "" { + // not changed + return nil + } + + if str == "unlimited" || str == "-" { + *changeCount++ + + log(ctx).Infof("Setting %v to a unlimited.", desc) + + *val = 0 + + return nil + } + + // nolint:gomnd + v, err := strconv.ParseFloat(str, 64) + if err != nil { + return errors.Wrapf(err, "can't parse the %v %q", desc, str) + } + + *changeCount++ + + if bps { + log(ctx).Infof("Setting %v to %v.", desc, units.BytesPerSecondsString(v)) + } else { + log(ctx).Infof("Setting %v to %v.", desc, v) + } + + *val = v + + return nil +} + +func (c *commonThrottleSet) setThrottleInt(ctx context.Context, desc string, val *int, str string, changeCount *int) error { + if str == "" { + // not changed + return nil + } + + if str == "unlimited" || str == "-" { + *changeCount++ + + log(ctx).Infof("Setting %v to a unlimited.", desc) + + *val = 0 + + return nil + } + + // nolint:gomnd + v, err := strconv.ParseInt(str, 10, 64) + if err != nil { + return errors.Wrapf(err, "can't parse the %v %q", desc, str) + } + + *changeCount++ + + log(ctx).Infof("Setting %v to %v.", desc, v) + + *val = int(v) + + return nil +} diff --git a/internal/server/server.go b/internal/server/server.go index ad2bace29..366f8001b 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -177,6 +177,8 @@ func (s *Server) SetupControlAPIHandlers(m *mux.Router) { m.HandleFunc("/api/v1/control/cancel-snapshot", s.handleServerControlAPI(handleCancel)).Methods(http.MethodPost) m.HandleFunc("/api/v1/control/pause-source", s.handleServerControlAPI(handlePause)).Methods(http.MethodPost) m.HandleFunc("/api/v1/control/resume-source", s.handleServerControlAPI(handleResume)).Methods(http.MethodPost) + m.HandleFunc("/api/v1/control/throttle", s.handleServerControlAPI(handleRepoGetThrottle)).Methods(http.MethodGet) + m.HandleFunc("/api/v1/control/throttle", s.handleServerControlAPI(handleRepoSetThrottle)).Methods(http.MethodPut) } func isAuthenticated(rc requestContext) bool { diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 9ae71ddb3..060d1f0bc 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -108,10 +108,12 @@ func MyTestMain(m *testing.M, cleanups ...func()) { v := m.Run() - if err := releasable.Verify(); err != nil { - log.Printf("found leaks: %v", err) + if v == 0 { + if err := releasable.Verify(); err != nil { + log.Printf("found leaks: %v", err) - v = 1 + v = 1 + } } for _, c := range cleanups { diff --git a/repo/blob/throttling/throttler.go b/repo/blob/throttling/throttler.go index 53aa80f67..1fdd6ad26 100644 --- a/repo/blob/throttling/throttler.go +++ b/repo/blob/throttling/throttler.go @@ -14,8 +14,12 @@ type SettableThrottler interface { Limits() Limits SetLimits(limits Limits) error + OnUpdate(handler UpdatedHandler) } +// UpdatedHandler is invoked as part of SetLimits() after limits are updated. +type UpdatedHandler func(l Limits) error + type tokenBucketBasedThrottler struct { mu sync.Mutex // +checklocks:mu @@ -26,7 +30,13 @@ type tokenBucketBasedThrottler struct { listOps *tokenBucket upload *tokenBucket download *tokenBucket - window time.Duration // +checklocksignore + + concurrentReads *semaphore + concurrentWrites *semaphore + + window time.Duration // +checklocksignore + + onUpdate []UpdatedHandler } func (t *tokenBucketBasedThrottler) BeforeOperation(ctx context.Context, op string) { @@ -35,8 +45,20 @@ func (t *tokenBucketBasedThrottler) BeforeOperation(ctx context.Context, op stri t.listOps.Take(ctx, 1) case operationGetBlob, operationGetMetadata: t.readOps.Take(ctx, 1) + t.concurrentReads.Acquire() case operationPutBlob, operationDeleteBlob: t.writeOps.Take(ctx, 1) + t.concurrentWrites.Acquire() + } +} + +func (t *tokenBucketBasedThrottler) AfterOperation(ctx context.Context, op string) { + switch op { + case operationListBlobs: + case operationGetBlob, operationGetMetadata: + t.concurrentReads.Release() + case operationPutBlob, operationDeleteBlob: + t.concurrentWrites.Release() } } @@ -64,8 +86,23 @@ func (t *tokenBucketBasedThrottler) SetLimits(limits Limits) error { t.mu.Lock() defer t.mu.Unlock() + if err := t.setLimits(limits); err != nil { + _ = t.setLimits(t.limits) + return err + } + t.limits = limits + for _, h := range t.onUpdate { + if err := h(limits); err != nil { + return err + } + } + + return nil +} + +func (t *tokenBucketBasedThrottler) setLimits(limits Limits) error { if err := t.readOps.SetLimit(limits.ReadsPerSecond * t.window.Seconds()); err != nil { return errors.Wrap(err, "ReadsPerSecond") } @@ -86,9 +123,21 @@ func (t *tokenBucketBasedThrottler) SetLimits(limits Limits) error { return errors.Wrap(err, "DownloadBytesPerSecond") } + if err := t.concurrentReads.SetLimit(limits.ConcurrentReads); err != nil { + return errors.Wrap(err, "ConcurrentReads") + } + + if err := t.concurrentWrites.SetLimit(limits.ConcurrentWrites); err != nil { + return errors.Wrap(err, "ConcurrentWrites") + } + return nil } +func (t *tokenBucketBasedThrottler) OnUpdate(handler UpdatedHandler) { + t.onUpdate = append(t.onUpdate, handler) +} + // Limits encapsulates all limits for a Throttler. type Limits struct { ReadsPerSecond float64 `json:"readsPerSecond,omitempty"` @@ -96,6 +145,8 @@ type Limits struct { ListsPerSecond float64 `json:"listsPerSecond,omitempty"` UploadBytesPerSecond float64 `json:"maxUploadSpeedBytesPerSecond,omitempty"` DownloadBytesPerSecond float64 `json:"maxDownloadSpeedBytesPerSecond,omitempty"` + ConcurrentReads int `json:"concurrentReads,omitempty"` + ConcurrentWrites int `json:"concurrentWrites,omitempty"` } var _ Throttler = (*tokenBucketBasedThrottler)(nil) @@ -103,12 +154,14 @@ type Limits struct { // NewThrottler returns a Throttler with provided limits. func NewThrottler(limits Limits, window time.Duration, initialFillRatio float64) (SettableThrottler, error) { t := &tokenBucketBasedThrottler{ - readOps: newTokenBucket("read-ops", initialFillRatio*limits.ReadsPerSecond*window.Seconds(), 0, window), - writeOps: newTokenBucket("write-ops", initialFillRatio*limits.WritesPerSecond*window.Seconds(), 0, window), - listOps: newTokenBucket("list-ops", initialFillRatio*limits.ListsPerSecond*window.Seconds(), 0, window), - upload: newTokenBucket("upload-bytes", initialFillRatio*limits.UploadBytesPerSecond*window.Seconds(), 0, window), - download: newTokenBucket("download-bytes", initialFillRatio*limits.DownloadBytesPerSecond*window.Seconds(), 0, window), - window: window, + readOps: newTokenBucket("read-ops", initialFillRatio*limits.ReadsPerSecond*window.Seconds(), 0, window), + writeOps: newTokenBucket("write-ops", initialFillRatio*limits.WritesPerSecond*window.Seconds(), 0, window), + listOps: newTokenBucket("list-ops", initialFillRatio*limits.ListsPerSecond*window.Seconds(), 0, window), + upload: newTokenBucket("upload-bytes", initialFillRatio*limits.UploadBytesPerSecond*window.Seconds(), 0, window), + download: newTokenBucket("download-bytes", initialFillRatio*limits.DownloadBytesPerSecond*window.Seconds(), 0, window), + concurrentReads: newSemaphore(), + concurrentWrites: newSemaphore(), + window: window, } if err := t.SetLimits(limits); err != nil { diff --git a/repo/blob/throttling/throttling_semaphore.go b/repo/blob/throttling/throttling_semaphore.go new file mode 100644 index 000000000..36faf09a1 --- /dev/null +++ b/repo/blob/throttling/throttling_semaphore.go @@ -0,0 +1,63 @@ +package throttling + +import ( + "sync" + + "github.com/pkg/errors" +) + +type semaphore struct { + mu sync.Mutex + // +checklocks:mu + sem chan struct{} +} + +func (s *semaphore) getChan() chan struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + return s.sem +} + +func (s *semaphore) Acquire() { + ch := s.getChan() + if ch == nil { + return + } + + // push to channel, may block + ch <- struct{}{} +} + +func (s *semaphore) Release() { + ch := s.getChan() + if ch == nil { + return + } + + select { + case <-ch: // removed one entry from channel + default: // this can happen when we reset a channel to a lower value + } +} + +func (s *semaphore) SetLimit(limit int) error { + s.mu.Lock() + defer s.mu.Unlock() + + if limit < 0 { + return errors.Errorf("invalid limit") + } + + if limit > 0 { + s.sem = make(chan struct{}, limit) + } else { + s.sem = nil + } + + return nil +} + +func newSemaphore() *semaphore { + return &semaphore{} +} diff --git a/repo/blob/throttling/throttling_semaphore_test.go b/repo/blob/throttling/throttling_semaphore_test.go new file mode 100644 index 000000000..5e4c4b098 --- /dev/null +++ b/repo/blob/throttling/throttling_semaphore_test.go @@ -0,0 +1,64 @@ +package throttling + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestThrottlingSemaphore(t *testing.T) { + s := newSemaphore() + // default is unlimited + s.Acquire() + s.Release() + + require.Error(t, s.SetLimit(-1)) + + for _, lim := range []int{3, 5, 7} { + require.NoError(t, s.SetLimit(lim)) + + var ( + wg sync.WaitGroup + mu sync.Mutex + concurrency int + maxConcurrency int + ) + + for i := 0; i < 10; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + for j := 0; j < 10; j++ { + s.Acquire() + + mu.Lock() + concurrency++ + + if concurrency > maxConcurrency { + maxConcurrency = concurrency + } + + mu.Unlock() + + time.Sleep(10 * time.Millisecond) + + mu.Lock() + concurrency-- + mu.Unlock() + + s.Release() + } + }() + } + + wg.Wait() + + // Equal() would probably work here due to Sleep(), but not risking a flake. + require.LessOrEqual(t, maxConcurrency, lim) + require.Greater(t, maxConcurrency, 0) + } +} diff --git a/repo/blob/throttling/throttling_storage.go b/repo/blob/throttling/throttling_storage.go index 9f95cc4f2..4ee2ff7e1 100644 --- a/repo/blob/throttling/throttling_storage.go +++ b/repo/blob/throttling/throttling_storage.go @@ -25,6 +25,7 @@ // attempted to ensure we don't exceed the desired rate of operations/bytes uploaded/downloaded. type Throttler interface { BeforeOperation(ctx context.Context, op string) + AfterOperation(ctx context.Context, op string) // BeforeDownload acquires the specified number of downloaded bytes // possibly blocking until enough are available. @@ -51,6 +52,8 @@ func (s *throttlingStorage) GetBlob(ctx context.Context, id blob.ID, offset, len } s.throttler.BeforeOperation(ctx, operationGetBlob) + defer s.throttler.AfterOperation(ctx, operationGetBlob) + s.throttler.BeforeDownload(ctx, acquired) output.Reset() @@ -73,17 +76,22 @@ func (s *throttlingStorage) GetBlob(ctx context.Context, id blob.ID, offset, len func (s *throttlingStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Metadata, error) { s.throttler.BeforeOperation(ctx, operationGetMetadata) + defer s.throttler.AfterOperation(ctx, operationGetMetadata) return s.Storage.GetMetadata(ctx, id) // nolint:wrapcheck } func (s *throttlingStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID, cb func(bm blob.Metadata) error) error { s.throttler.BeforeOperation(ctx, operationListBlobs) + defer s.throttler.AfterOperation(ctx, operationListBlobs) + return s.Storage.ListBlobs(ctx, blobIDPrefix, cb) // nolint:wrapcheck } func (s *throttlingStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, opts blob.PutOptions) error { s.throttler.BeforeOperation(ctx, operationPutBlob) + defer s.throttler.AfterOperation(ctx, operationPutBlob) + s.throttler.BeforeUpload(ctx, int64(data.Length())) return s.Storage.PutBlob(ctx, id, data, opts) // nolint:wrapcheck @@ -91,6 +99,8 @@ func (s *throttlingStorage) PutBlob(ctx context.Context, id blob.ID, data blob.B func (s *throttlingStorage) DeleteBlob(ctx context.Context, id blob.ID) error { s.throttler.BeforeOperation(ctx, operationDeleteBlob) + defer s.throttler.AfterOperation(ctx, operationDeleteBlob) + return s.Storage.DeleteBlob(ctx, id) // nolint:wrapcheck } diff --git a/repo/blob/throttling/throttling_storage_test.go b/repo/blob/throttling/throttling_storage_test.go index 4ef8d0f13..bf6d27d83 100644 --- a/repo/blob/throttling/throttling_storage_test.go +++ b/repo/blob/throttling/throttling_storage_test.go @@ -28,6 +28,10 @@ func (m *mockThrottler) BeforeOperation(ctx context.Context, op string) { m.activity = append(m.activity, fmt.Sprintf("BeforeOperation(%v)", op)) } +func (m *mockThrottler) AfterOperation(ctx context.Context, op string) { + m.activity = append(m.activity, fmt.Sprintf("AfterOperation(%v)", op)) +} + func (m *mockThrottler) BeforeDownload(ctx context.Context, numBytes int64) { m.activity = append(m.activity, fmt.Sprintf("BeforeDownload(%v)", numBytes)) } @@ -66,6 +70,7 @@ func TestThrottling(t *testing.T) { "inner.concurrency level reached", "inner.GetBlob", "ReturnUnusedDownloadBytes(20000000)", + "AfterOperation(GetBlob)", }, m.activity) // upload blob of 7 bytes @@ -75,6 +80,7 @@ func TestThrottling(t *testing.T) { "BeforeOperation(PutBlob)", "BeforeUpload(7)", "inner.PutBlob", + "AfterOperation(PutBlob)", }, m.activity) // upload another blob of 30MB @@ -84,6 +90,7 @@ func TestThrottling(t *testing.T) { "BeforeOperation(PutBlob)", "BeforeUpload(30000000)", "inner.PutBlob", + "AfterOperation(PutBlob)", }, m.activity) m.Reset() @@ -93,6 +100,7 @@ func TestThrottling(t *testing.T) { "BeforeDownload(20000000)", // length is unknown, we assume 20MB "inner.GetBlob", "ReturnUnusedDownloadBytes(19999993)", // refund all but 7 bytes + "AfterOperation(GetBlob)", }, m.activity) m.Reset() @@ -102,6 +110,7 @@ func TestThrottling(t *testing.T) { "BeforeDownload(20000000)", // length is unknown, we assume 20MB "inner.GetBlob", "BeforeDownload(10000000)", // we downloaded more than expected, acquire more + "AfterOperation(GetBlob)", }, m.activity) m.Reset() @@ -110,6 +119,7 @@ func TestThrottling(t *testing.T) { "BeforeOperation(GetBlob)", "BeforeDownload(4)", "inner.GetBlob", + "AfterOperation(GetBlob)", }, m.activity) m.Reset() @@ -119,6 +129,7 @@ func TestThrottling(t *testing.T) { require.Equal(t, []string{ "BeforeOperation(GetMetadata)", "inner.GetMetadata", + "AfterOperation(GetMetadata)", }, m.activity) m.Reset() @@ -126,6 +137,7 @@ func TestThrottling(t *testing.T) { require.Equal(t, []string{ "BeforeOperation(DeleteBlob)", "inner.DeleteBlob", + "AfterOperation(DeleteBlob)", }, m.activity) m.Reset() @@ -135,5 +147,6 @@ func TestThrottling(t *testing.T) { require.Equal(t, []string{ "BeforeOperation(ListBlobs)", "inner.ListBlobs", + "AfterOperation(ListBlobs)", }, m.activity) } diff --git a/repo/local_config.go b/repo/local_config.go index 383962dfe..bc2a593ca 100644 --- a/repo/local_config.go +++ b/repo/local_config.go @@ -13,6 +13,7 @@ "github.com/kopia/kopia/internal/atomicfile" "github.com/kopia/kopia/internal/ospath" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/throttling" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/object" ) @@ -32,6 +33,8 @@ type ClientOptions struct { EnableActions bool `json:"enableActions"` FormatBlobCacheDuration time.Duration `json:"formatBlobCacheDuration,omitempty"` + + Throttling *throttling.Limits `json:"throttlingLimits,omitempty"` } // ApplyDefaults returns a copy of ClientOptions with defaults filled out. diff --git a/repo/open.go b/repo/open.go index 015332d78..aa31e7fba 100644 --- a/repo/open.go +++ b/repo/open.go @@ -294,11 +294,27 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw cmOpts.RepositoryFormatBytes = nil } - st, throttler, err := addThrottler(ctx, st) + limits := throttlingLimitsFromConnectionInfo(ctx, st.ConnectionInfo()) + if lc.Throttling != nil { + limits = *lc.Throttling + } + + st, throttler, err := addThrottler(st, limits) if err != nil { return nil, errors.Wrap(err, "unable to add throttler") } + throttler.OnUpdate(func(l throttling.Limits) error { + lc2, err2 := LoadConfigFromFile(configFile) + if err2 != nil { + return err2 + } + + lc2.Throttling = &l + + return lc2.writeToFile(configFile) + }) + if blobcfg.IsRetentionEnabled() { st = wrapLockingStorage(st, blobcfg) } @@ -328,12 +344,11 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw } dr := &directRepository{ - cmgr: cm, - omgr: om, - blobs: st, - mmgr: manifests, - sm: scm, - throttler: throttler, + cmgr: cm, + omgr: om, + blobs: st, + mmgr: manifests, + sm: scm, directRepositoryParameters: directRepositoryParameters{ uniqueID: ufb.f.UniqueID, cachingOptions: *cacheOpts, @@ -344,6 +359,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw cliOpts: lc.ClientOptions.ApplyDefaults(ctx, "Repository in "+st.DisplayName()), configFile: configFile, nextWriterID: new(int32), + throttler: throttler, }, closed: make(chan struct{}), } @@ -373,9 +389,8 @@ func wrapLockingStorage(st blob.Storage, r content.BlobCfgBlob) blob.Storage { }) } -func addThrottler(ctx context.Context, st blob.Storage) (blob.Storage, throttling.SettableThrottler, error) { - throttler, err := throttling.NewThrottler( - throttlingLimitsFromConnectionInfo(ctx, st.ConnectionInfo()), throttlingWindow, throttleBucketInitialFill) +func addThrottler(st blob.Storage, limits throttling.Limits) (blob.Storage, throttling.SettableThrottler, error) { + throttler, err := throttling.NewThrottler(limits, throttlingWindow, throttleBucketInitialFill) if err != nil { return nil, nil, errors.Wrap(err, "unable to create throttler") } diff --git a/repo/repository.go b/repo/repository.go index 66547a75f..69e605d5f 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -87,6 +87,7 @@ type directRepositoryParameters struct { blobCfgBlob content.BlobCfgBlob formatEncryptionKey []byte nextWriterID *int32 + throttler throttling.SettableThrottler } // directRepository is an implementation of repository that directly manipulates underlying storage. @@ -99,8 +100,6 @@ type directRepository struct { mmgr *manifest.Manager sm *content.SharedManager - throttler throttling.SettableThrottler - closed chan struct{} }