feat(snapshots): support for controlling upload parallelism via policies (#1850)

This commit is contained in:
Jarek Kowalski
2022-03-26 12:20:49 -07:00
committed by GitHub
parent d7e10dba59
commit 92683c5a5e
14 changed files with 222 additions and 29 deletions

View File

@@ -23,6 +23,7 @@ type commandPolicySet struct {
policyLoggingFlags
policyRetentionFlags
policySchedulingFlags
policyUploadFlags
}
func (c *commandPolicySet) setup(svc appServices, parent commandParent) {
@@ -37,6 +38,7 @@ func (c *commandPolicySet) setup(svc appServices, parent commandParent) {
c.policyLoggingFlags.setup(cmd)
c.policyRetentionFlags.setup(cmd)
c.policySchedulingFlags.setup(cmd)
c.policyUploadFlags.setup(cmd)
cmd.Action(svc.repositoryWriterAction(c.run))
}
@@ -113,6 +115,10 @@ func (c *commandPolicySet) setPolicyFromFlags(ctx context.Context, p *policy.Pol
return errors.Wrap(err, "actions policy")
}
if err := c.setUploadPolicyFromFlags(ctx, &p.UploadPolicy, changeCount); err != nil {
return errors.Wrap(err, "upload policy")
}
// It's not really a list, just optional boolean, last one wins.
for _, inherit := range c.inherit {
*changeCount++

View File

@@ -0,0 +1,31 @@
package cli
import (
"context"
"github.com/alecthomas/kingpin"
"github.com/kopia/kopia/snapshot/policy"
)
type policyUploadFlags struct {
maxParallelUploads string
maxParallelFileReads string
}
func (c *policyUploadFlags) setup(cmd *kingpin.CmdClause) {
cmd.Flag("max-parallel-file-reads", "Maximum number of parallel file reads").StringVar(&c.maxParallelFileReads)
cmd.Flag("max-parallel-snapshots", "Maximum number of parallel snapshots (server, KopiaUI only)").StringVar(&c.maxParallelUploads)
}
func (c *policyUploadFlags) setUploadPolicyFromFlags(ctx context.Context, up *policy.UploadPolicy, changeCount *int) error {
if err := applyOptionalInt(ctx, "max parallel file reads", &up.MaxParallelFileReads, c.maxParallelFileReads, changeCount); err != nil {
return err
}
if err := applyOptionalInt(ctx, "max parallel snapshots", &up.MaxParallelSnapshots, c.maxParallelUploads, changeCount); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,46 @@
package cli_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/tests/testenv"
)
func TestSetUploadPolicy(t *testing.T) {
e := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, testenv.NewInProcRunner(t))
defer e.RunAndExpectSuccess(t, "repo", "disconnect")
e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir)
lines := e.RunAndExpectSuccess(t, "policy", "show", "--global")
lines = compressSpaces(lines)
require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 (defined for this target)")
require.Contains(t, lines, " Max parallel file reads: - (defined for this target)")
// make some directory we'll be setting policy on
td := testutil.TempDirectory(t)
lines = e.RunAndExpectSuccess(t, "policy", "show", td)
lines = compressSpaces(lines)
require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 inherited from (global)")
require.Contains(t, lines, " Max parallel file reads: - inherited from (global)")
e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=7", "--max-parallel-file-reads=33")
lines = e.RunAndExpectSuccess(t, "policy", "show", td)
lines = compressSpaces(lines)
require.Contains(t, lines, " Max parallel snapshots (server/UI): 7 inherited from (global)")
require.Contains(t, lines, " Max parallel file reads: 33 inherited from (global)")
e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=default", "--max-parallel-file-reads=default")
lines = e.RunAndExpectSuccess(t, "policy", "show", td)
lines = compressSpaces(lines)
require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 inherited from (global)")
require.Contains(t, lines, " Max parallel file reads: - inherited from (global)")
}

View File

@@ -122,6 +122,8 @@ func printPolicy(out *textOutput, p *policy.Policy, def *policy.Definition) {
rows = append(rows, policyTableRow{})
rows = appendSchedulingPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendUploadPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendCompressionPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendActionsPolicyRows(rows, p, def)
@@ -262,6 +264,14 @@ func appendLoggingPolicyRows(rows []policyTableRow, p *policy.Policy, def *polic
)
}
func appendUploadPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow {
return append(rows,
policyTableRow{"Uploads:", "", ""},
policyTableRow{" Max parallel snapshots (server/UI):", valueOrNotSet(p.UploadPolicy.MaxParallelSnapshots), definitionPointToString(p.Target(), def.UploadPolicy.MaxParallelSnapshots)},
policyTableRow{" Max parallel file reads:", valueOrNotSet(p.UploadPolicy.MaxParallelFileReads), definitionPointToString(p.Target(), def.UploadPolicy.MaxParallelFileReads)},
)
}
func appendSchedulingPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow {
rows = append(rows, policyTableRow{"Scheduling policy:", "", ""})

2
go.mod
View File

@@ -84,7 +84,7 @@ require (
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kopia/htmluibuild v0.0.0-20220307043644-8ec05c0034ed
github.com/kopia/htmluibuild v0.0.0-20220326183613-bbc499ed4dad
github.com/kr/fs v0.1.0 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect

2
go.sum
View File

@@ -400,6 +400,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kopia/htmluibuild v0.0.0-20220307043644-8ec05c0034ed h1:KJJqCeSl0FweVVrQNNBpe5sve4VKYTtzXe0EysHNAjg=
github.com/kopia/htmluibuild v0.0.0-20220307043644-8ec05c0034ed/go.mod h1:eWer4rx9P8lJo2eKc+Q7AZ1dE1x1hJNdkbDFPzMu1Hw=
github.com/kopia/htmluibuild v0.0.0-20220326183613-bbc499ed4dad h1:v+S3pBKXxFQ1PSBfPImVnIM+HHNCrB+5EU8jvTKFyZA=
github.com/kopia/htmluibuild v0.0.0-20220326183613-bbc499ed4dad/go.mod h1:eWer4rx9P8lJo2eKc+Q7AZ1dE1x1hJNdkbDFPzMu1Hw=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=

View File

@@ -144,7 +144,7 @@ func handlePolicyPut(ctx context.Context, rc requestContext) (interface{}, *apiE
return nil, internalServerError(err)
}
rc.srv.triggerRefreshSource(sourceInfo)
_ = rc.srv.Refresh(ctx)
return &serverapi.Empty{}, nil
}

View File

@@ -17,7 +17,6 @@
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
"github.com/kopia/kopia/internal/auth"
"github.com/kopia/kopia/internal/clock"
@@ -64,17 +63,27 @@ type apiRequestFunc func(ctx context.Context, rc requestContext) (interface{
// Server exposes simple HTTP API for programmatically accessing Kopia features.
type Server struct {
OnShutdown func(ctx context.Context) error
options Options
authenticator auth.Authenticator
authorizer auth.Authorizer
uploadSemaphore *semaphore.Weighted
OnShutdown func(ctx context.Context) error
options Options
authenticator auth.Authenticator
authorizer auth.Authorizer
initTaskMutex sync.Mutex
// +checklocks:initTaskMutex
initRepositoryTaskID string // non-empty - repository is currently being opened.
serverMutex sync.RWMutex
parallelSnapshotsMutex sync.Mutex
// +checklocks:parallelSnapshotsMutex
parallelSnapshotsChanged *sync.Cond // condition triggered on change to currentParallelSnapshots or maxParallelSnapshots
// +checklocks:parallelSnapshotsMutex
currentParallelSnapshots int
// +checklocks:parallelSnapshotsMutex
maxParallelSnapshots int
// +checklocks:serverMutex
rep repo.Repository
// +checklocks:serverMutex
@@ -476,23 +485,44 @@ func (s *Server) requestShutdown(ctx context.Context) {
}
}
func (s *Server) setMaxParallelSnapshotsLocked(max int) {
s.parallelSnapshotsMutex.Lock()
defer s.parallelSnapshotsMutex.Unlock()
s.maxParallelSnapshots = max
s.parallelSnapshotsChanged.Broadcast()
}
func (s *Server) beginUpload(ctx context.Context, src snapshot.SourceInfo) bool {
log(ctx).Debugf("waiting on semaphore to upload %v", src)
s.parallelSnapshotsMutex.Lock()
defer s.parallelSnapshotsMutex.Unlock()
if err := s.uploadSemaphore.Acquire(ctx, 1); err != nil {
log(ctx).Debugf("error acquiring semaphore to upload %v: %v", src, err)
for s.currentParallelSnapshots >= s.maxParallelSnapshots && ctx.Err() == nil {
log(ctx).Debugf("waiting on for parallel snapshot upload slot to be available %v", src)
s.parallelSnapshotsChanged.Wait()
}
if ctx.Err() != nil {
// context closed
return false
}
log(ctx).Debugf("entered semaphore to upload %v", src)
// at this point s.currentParallelSnapshots < s.maxParallelSnapshots and we are locked
s.currentParallelSnapshots++
return true
}
func (s *Server) endUpload(ctx context.Context, src snapshot.SourceInfo) {
s.parallelSnapshotsMutex.Lock()
defer s.parallelSnapshotsMutex.Unlock()
log(ctx).Debugf("finished uploading %v", src)
s.uploadSemaphore.Release(1)
s.currentParallelSnapshots--
// notify one of the waiters
s.parallelSnapshotsChanged.Signal()
}
func (s *Server) triggerRefreshSource(sourceInfo snapshot.SourceInfo) {
@@ -662,6 +692,18 @@ func (s *Server) syncSourcesLocked(ctx context.Context) error {
return errors.Wrap(err, "unable to list sources")
}
// user@host policy
userhostPol, _, _, err := policy.GetEffectivePolicy(ctx, s.rep, snapshot.SourceInfo{
UserName: s.rep.ClientOptions().Username,
Host: s.rep.ClientOptions().Hostname,
})
s.setMaxParallelSnapshotsLocked(userhostPol.UploadPolicy.MaxParallelSnapshots.OrDefault(1))
if err != nil {
return errors.Wrap(err, "unable to get user policy")
}
for _, ss := range snapshotSources {
sources[ss] = true
}
@@ -995,7 +1037,7 @@ func New(ctx context.Context, options *Options) (*Server, error) {
s := &Server{
options: *options,
sourceManagers: map[snapshot.SourceInfo]*sourceManager{},
uploadSemaphore: semaphore.NewWeighted(1),
maxParallelSnapshots: 1,
grpcServerState: makeGRPCServerState(options.MaxConcurrency),
authenticator: options.Authenticator,
authorizer: options.Authorizer,
@@ -1004,5 +1046,7 @@ func New(ctx context.Context, options *Options) (*Server, error) {
authCookieSigningKey: []byte(options.AuthCookieSigningKey),
}
s.parallelSnapshotsChanged = sync.NewCond(&s.parallelSnapshotsMutex)
return s, nil
}

View File

@@ -29,6 +29,7 @@ type Policy struct {
CompressionPolicy CompressionPolicy `json:"compression,omitempty"`
Actions ActionsPolicy `json:"actions,omitempty"`
LoggingPolicy LoggingPolicy `json:"logging,omitempty"`
UploadPolicy UploadPolicy `json:"upload,omitempty"`
NoParent bool `json:"noParent,omitempty"`
}
@@ -42,6 +43,7 @@ type Definition struct {
CompressionPolicy CompressionPolicyDefinition `json:"compression,omitempty"`
Actions ActionsPolicyDefinition `json:"actions,omitempty"`
LoggingPolicy LoggingPolicyDefinition `json:"logging,omitempty"`
UploadPolicy UploadPolicyDefinition `json:"upload,omitempty"`
}
func (p *Policy) String() string {
@@ -73,8 +75,16 @@ func (p *Policy) Target() snapshot.SourceInfo {
// ValidatePolicy returns error if the given policy is invalid.
// Currently, only SchedulingPolicy is validated.
func ValidatePolicy(pol *Policy) error {
return ValidateSchedulingPolicy(pol.SchedulingPolicy)
func ValidatePolicy(si snapshot.SourceInfo, pol *Policy) error {
if err := ValidateSchedulingPolicy(pol.SchedulingPolicy); err != nil {
return errors.Wrap(err, "invalid scheduling policy")
}
if err := ValidateUploadPolicy(si, pol.UploadPolicy); err != nil {
return errors.Wrap(err, "invalid upload policy")
}
return nil
}
// validatePolicyPath validates that the provided policy path is valid and the path exists.

View File

@@ -158,7 +158,7 @@ func GetDefinedPolicy(ctx context.Context, rep repo.Repository, si snapshot.Sour
// SetPolicy sets the policy on a given source.
func SetPolicy(ctx context.Context, rep repo.RepositoryWriter, si snapshot.SourceInfo, pol *Policy) error {
if err := ValidatePolicy(pol); err != nil {
if err := ValidatePolicy(si, pol); err != nil {
return errors.Wrap(err, "failed to validate policy")
}

View File

@@ -22,6 +22,7 @@ func MergePolicies(policies []*Policy, si snapshot.SourceInfo) (*Policy, *Defini
merged.FilesPolicy.Merge(p.FilesPolicy, &def.FilesPolicy, p.Target())
merged.ErrorHandlingPolicy.Merge(p.ErrorHandlingPolicy, &def.ErrorHandlingPolicy, p.Target())
merged.SchedulingPolicy.Merge(p.SchedulingPolicy, &def.SchedulingPolicy, p.Target())
merged.UploadPolicy.Merge(p.UploadPolicy, &def.UploadPolicy, p.Target())
merged.CompressionPolicy.Merge(p.CompressionPolicy, &def.CompressionPolicy, p.Target())
merged.Actions.Merge(p.Actions, &def.Actions, p.Target())
merged.LoggingPolicy.Merge(p.LoggingPolicy, &def.LoggingPolicy, p.Target())
@@ -36,6 +37,7 @@ func MergePolicies(policies []*Policy, si snapshot.SourceInfo) (*Policy, *Defini
merged.FilesPolicy.Merge(defaultFilesPolicy, &def.FilesPolicy, GlobalPolicySourceInfo)
merged.ErrorHandlingPolicy.Merge(defaultErrorHandlingPolicy, &def.ErrorHandlingPolicy, GlobalPolicySourceInfo)
merged.SchedulingPolicy.Merge(defaultSchedulingPolicy, &def.SchedulingPolicy, GlobalPolicySourceInfo)
merged.UploadPolicy.Merge(defaultUploadPolicy, &def.UploadPolicy, GlobalPolicySourceInfo)
merged.CompressionPolicy.Merge(defaultCompressionPolicy, &def.CompressionPolicy, GlobalPolicySourceInfo)
merged.Actions.Merge(defaultActionsPolicy, &def.Actions, GlobalPolicySourceInfo)
merged.LoggingPolicy.Merge(defaultLoggingPolicy, &def.LoggingPolicy, GlobalPolicySourceInfo)

View File

@@ -50,6 +50,11 @@
defaultSchedulingPolicy = SchedulingPolicy{}
defaultUploadPolicy = UploadPolicy{
MaxParallelSnapshots: newOptionalInt(1),
MaxParallelFileReads: nil, // defaults to runtime.NumCPUs()
}
// DefaultPolicy is a default policy returned by policy tree in absence of other policies.
DefaultPolicy = &Policy{
FilesPolicy: defaultFilesPolicy,
@@ -59,6 +64,7 @@
SchedulingPolicy: defaultSchedulingPolicy,
LoggingPolicy: defaultLoggingPolicy,
Actions: defaultActionsPolicy,
UploadPolicy: defaultUploadPolicy,
}
// DefaultDefinition provides the Definition for the default policy.

View File

@@ -0,0 +1,34 @@
package policy
import (
"github.com/pkg/errors"
"github.com/kopia/kopia/snapshot"
)
// UploadPolicy describes policy to apply when uploading snapshots.
type UploadPolicy struct {
MaxParallelSnapshots *OptionalInt `json:"maxParallelSnapshots,omitempty"`
MaxParallelFileReads *OptionalInt `json:"maxParallelFileReads,omitempty"`
}
// UploadPolicyDefinition specifies which policy definition provided the value of a particular field.
type UploadPolicyDefinition struct {
MaxParallelSnapshots snapshot.SourceInfo `json:"maxParallelSnapshots,omitempty"`
MaxParallelFileReads snapshot.SourceInfo `json:"maxParallelFileReads,omitempty"`
}
// Merge applies default values from the provided policy.
func (p *UploadPolicy) Merge(src UploadPolicy, def *UploadPolicyDefinition, si snapshot.SourceInfo) {
mergeOptionalInt(&p.MaxParallelSnapshots, src.MaxParallelSnapshots, &def.MaxParallelSnapshots, si)
mergeOptionalInt(&p.MaxParallelFileReads, src.MaxParallelFileReads, &def.MaxParallelFileReads, si)
}
// ValidateUploadPolicy returns an error if manual field is set along with Upload fields.
func ValidateUploadPolicy(si snapshot.SourceInfo, p UploadPolicy) error {
if si.Path != "" && p.MaxParallelSnapshots != nil {
return errors.Errorf("max parallel snapshots cannot be specified for paths, only global, username@hostname or @hostname")
}
return nil
}

View File

@@ -376,7 +376,7 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) {
// uploadFileWithCheckpointing uploads the specified File to the repository.
func (u *Uploader) uploadFileWithCheckpointing(ctx context.Context, relativePath string, file fs.File, pol *policy.Policy, sourceInfo snapshot.SourceInfo) (*snapshot.DirEntry, error) {
par := u.effectiveParallelUploads()
par := u.effectiveParallelFileReads(pol)
if par == 1 {
par = 0
}
@@ -825,10 +825,12 @@ func (u *Uploader) maybeIgnoreCachedEntry(ctx context.Context, ent fs.Entry) fs.
return nil
}
func (u *Uploader) effectiveParallelUploads() int {
func (u *Uploader) effectiveParallelFileReads(pol *policy.Policy) int {
p := u.ParallelUploads
if p == 0 {
p = runtime.NumCPU()
max := pol.UploadPolicy.MaxParallelFileReads.OrDefault(runtime.NumCPU())
if p < 1 || p > max {
return max
}
return p
@@ -845,7 +847,7 @@ func (u *Uploader) processNonDirectories(
prevEntries []fs.Entries,
wg *workshare.AsyncGroup,
) error {
workerCount := u.effectiveParallelUploads()
workerCount := u.effectiveParallelFileReads(policyTree.EffectivePolicy())
var asyncWritesPerFile int
@@ -1236,7 +1238,6 @@ func NewUploader(r repo.RepositoryWriter) *Uploader {
return &Uploader{
repo: r,
Progress: &NullUploadProgress{},
ParallelUploads: 1,
EnableActions: r.ClientOptions().EnableActions,
CheckpointInterval: DefaultCheckpointInterval,
getTicker: time.Tick,
@@ -1273,17 +1274,18 @@ func (u *Uploader) Upload(
sourceInfo snapshot.SourceInfo,
previousManifests ...*snapshot.Manifest,
) (*snapshot.Manifest, error) {
uploadLog(ctx).Debugf("Uploading %v", sourceInfo)
u.Progress.UploadStarted()
defer u.Progress.UploadFinished()
parallel := u.effectiveParallelFileReads(policyTree.EffectivePolicy())
uploadLog(ctx).Debugf("Uploading %v with parallelism %v", sourceInfo, parallel)
s := &snapshot.Manifest{
Source: sourceInfo,
}
u.Progress.UploadStarted()
defer u.Progress.UploadFinished()
u.workerPool = workshare.NewPool(u.effectiveParallelUploads() - 1)
u.workerPool = workshare.NewPool(parallel - 1)
defer u.workerPool.Close()
u.stats = &snapshot.Stats{}