From 92683c5a5ec2cd736dcc4a56fe2c254d358958ef Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 26 Mar 2022 12:20:49 -0700 Subject: [PATCH] feat(snapshots): support for controlling upload parallelism via policies (#1850) --- cli/command_policy_set.go | 6 +++ cli/command_policy_set_upload.go | 31 ++++++++++++ cli/command_policy_set_upload_test.go | 46 ++++++++++++++++++ cli/command_policy_show.go | 10 ++++ go.mod | 2 +- go.sum | 2 + internal/server/api_policies.go | 2 +- internal/server/server.go | 68 ++++++++++++++++++++++----- snapshot/policy/policy.go | 14 +++++- snapshot/policy/policy_manager.go | 2 +- snapshot/policy/policy_merge.go | 2 + snapshot/policy/policy_tree.go | 6 +++ snapshot/policy/upload_policy.go | 34 ++++++++++++++ snapshot/snapshotfs/upload.go | 26 +++++----- 14 files changed, 222 insertions(+), 29 deletions(-) create mode 100644 cli/command_policy_set_upload.go create mode 100644 cli/command_policy_set_upload_test.go create mode 100644 snapshot/policy/upload_policy.go diff --git a/cli/command_policy_set.go b/cli/command_policy_set.go index f4e1e61fd..eb6159495 100644 --- a/cli/command_policy_set.go +++ b/cli/command_policy_set.go @@ -23,6 +23,7 @@ type commandPolicySet struct { policyLoggingFlags policyRetentionFlags policySchedulingFlags + policyUploadFlags } func (c *commandPolicySet) setup(svc appServices, parent commandParent) { @@ -37,6 +38,7 @@ func (c *commandPolicySet) setup(svc appServices, parent commandParent) { c.policyLoggingFlags.setup(cmd) c.policyRetentionFlags.setup(cmd) c.policySchedulingFlags.setup(cmd) + c.policyUploadFlags.setup(cmd) cmd.Action(svc.repositoryWriterAction(c.run)) } @@ -113,6 +115,10 @@ func (c *commandPolicySet) setPolicyFromFlags(ctx context.Context, p *policy.Pol return errors.Wrap(err, "actions policy") } + if err := c.setUploadPolicyFromFlags(ctx, &p.UploadPolicy, changeCount); err != nil { + return errors.Wrap(err, "upload policy") + } + // It's not really a list, just optional boolean, last one wins. for _, inherit := range c.inherit { *changeCount++ diff --git a/cli/command_policy_set_upload.go b/cli/command_policy_set_upload.go new file mode 100644 index 000000000..fcc0afd79 --- /dev/null +++ b/cli/command_policy_set_upload.go @@ -0,0 +1,31 @@ +package cli + +import ( + "context" + + "github.com/alecthomas/kingpin" + + "github.com/kopia/kopia/snapshot/policy" +) + +type policyUploadFlags struct { + maxParallelUploads string + maxParallelFileReads string +} + +func (c *policyUploadFlags) setup(cmd *kingpin.CmdClause) { + cmd.Flag("max-parallel-file-reads", "Maximum number of parallel file reads").StringVar(&c.maxParallelFileReads) + cmd.Flag("max-parallel-snapshots", "Maximum number of parallel snapshots (server, KopiaUI only)").StringVar(&c.maxParallelUploads) +} + +func (c *policyUploadFlags) setUploadPolicyFromFlags(ctx context.Context, up *policy.UploadPolicy, changeCount *int) error { + if err := applyOptionalInt(ctx, "max parallel file reads", &up.MaxParallelFileReads, c.maxParallelFileReads, changeCount); err != nil { + return err + } + + if err := applyOptionalInt(ctx, "max parallel snapshots", &up.MaxParallelSnapshots, c.maxParallelUploads, changeCount); err != nil { + return err + } + + return nil +} diff --git a/cli/command_policy_set_upload_test.go b/cli/command_policy_set_upload_test.go new file mode 100644 index 000000000..5a4c2dce4 --- /dev/null +++ b/cli/command_policy_set_upload_test.go @@ -0,0 +1,46 @@ +package cli_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/tests/testenv" +) + +func TestSetUploadPolicy(t *testing.T) { + e := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, testenv.NewInProcRunner(t)) + defer e.RunAndExpectSuccess(t, "repo", "disconnect") + + e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir) + + lines := e.RunAndExpectSuccess(t, "policy", "show", "--global") + lines = compressSpaces(lines) + require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 (defined for this target)") + require.Contains(t, lines, " Max parallel file reads: - (defined for this target)") + + // make some directory we'll be setting policy on + td := testutil.TempDirectory(t) + + lines = e.RunAndExpectSuccess(t, "policy", "show", td) + lines = compressSpaces(lines) + require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 inherited from (global)") + require.Contains(t, lines, " Max parallel file reads: - inherited from (global)") + + e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=7", "--max-parallel-file-reads=33") + + lines = e.RunAndExpectSuccess(t, "policy", "show", td) + lines = compressSpaces(lines) + + require.Contains(t, lines, " Max parallel snapshots (server/UI): 7 inherited from (global)") + require.Contains(t, lines, " Max parallel file reads: 33 inherited from (global)") + + e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=default", "--max-parallel-file-reads=default") + + lines = e.RunAndExpectSuccess(t, "policy", "show", td) + lines = compressSpaces(lines) + + require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 inherited from (global)") + require.Contains(t, lines, " Max parallel file reads: - inherited from (global)") +} diff --git a/cli/command_policy_show.go b/cli/command_policy_show.go index bb88ffb75..6f17738be 100644 --- a/cli/command_policy_show.go +++ b/cli/command_policy_show.go @@ -122,6 +122,8 @@ func printPolicy(out *textOutput, p *policy.Policy, def *policy.Definition) { rows = append(rows, policyTableRow{}) rows = appendSchedulingPolicyRows(rows, p, def) rows = append(rows, policyTableRow{}) + rows = appendUploadPolicyRows(rows, p, def) + rows = append(rows, policyTableRow{}) rows = appendCompressionPolicyRows(rows, p, def) rows = append(rows, policyTableRow{}) rows = appendActionsPolicyRows(rows, p, def) @@ -262,6 +264,14 @@ func appendLoggingPolicyRows(rows []policyTableRow, p *policy.Policy, def *polic ) } +func appendUploadPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow { + return append(rows, + policyTableRow{"Uploads:", "", ""}, + policyTableRow{" Max parallel snapshots (server/UI):", valueOrNotSet(p.UploadPolicy.MaxParallelSnapshots), definitionPointToString(p.Target(), def.UploadPolicy.MaxParallelSnapshots)}, + policyTableRow{" Max parallel file reads:", valueOrNotSet(p.UploadPolicy.MaxParallelFileReads), definitionPointToString(p.Target(), def.UploadPolicy.MaxParallelFileReads)}, + ) +} + func appendSchedulingPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow { rows = append(rows, policyTableRow{"Scheduling policy:", "", ""}) diff --git a/go.mod b/go.mod index 02bf4e27f..cb3e74cb6 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,7 @@ require ( github.com/googleapis/gax-go/v2 v2.1.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/kopia/htmluibuild v0.0.0-20220307043644-8ec05c0034ed + github.com/kopia/htmluibuild v0.0.0-20220326183613-bbc499ed4dad github.com/kr/fs v0.1.0 // indirect github.com/mattn/go-ieproxy v0.0.1 // indirect github.com/mattn/go-isatty v0.0.14 // indirect diff --git a/go.sum b/go.sum index d81e22fab..c133e99c9 100644 --- a/go.sum +++ b/go.sum @@ -400,6 +400,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kopia/htmluibuild v0.0.0-20220307043644-8ec05c0034ed h1:KJJqCeSl0FweVVrQNNBpe5sve4VKYTtzXe0EysHNAjg= github.com/kopia/htmluibuild v0.0.0-20220307043644-8ec05c0034ed/go.mod h1:eWer4rx9P8lJo2eKc+Q7AZ1dE1x1hJNdkbDFPzMu1Hw= +github.com/kopia/htmluibuild v0.0.0-20220326183613-bbc499ed4dad h1:v+S3pBKXxFQ1PSBfPImVnIM+HHNCrB+5EU8jvTKFyZA= +github.com/kopia/htmluibuild v0.0.0-20220326183613-bbc499ed4dad/go.mod h1:eWer4rx9P8lJo2eKc+Q7AZ1dE1x1hJNdkbDFPzMu1Hw= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= diff --git a/internal/server/api_policies.go b/internal/server/api_policies.go index 1402199a1..1a6953ca5 100644 --- a/internal/server/api_policies.go +++ b/internal/server/api_policies.go @@ -144,7 +144,7 @@ func handlePolicyPut(ctx context.Context, rc requestContext) (interface{}, *apiE return nil, internalServerError(err) } - rc.srv.triggerRefreshSource(sourceInfo) + _ = rc.srv.Refresh(ctx) return &serverapi.Empty{}, nil } diff --git a/internal/server/server.go b/internal/server/server.go index 5510a9ea3..ad2bace29 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -17,7 +17,6 @@ "github.com/google/uuid" "github.com/gorilla/mux" "github.com/pkg/errors" - "golang.org/x/sync/semaphore" "github.com/kopia/kopia/internal/auth" "github.com/kopia/kopia/internal/clock" @@ -64,17 +63,27 @@ type apiRequestFunc func(ctx context.Context, rc requestContext) (interface{ // Server exposes simple HTTP API for programmatically accessing Kopia features. type Server struct { - OnShutdown func(ctx context.Context) error - options Options - authenticator auth.Authenticator - authorizer auth.Authorizer - uploadSemaphore *semaphore.Weighted + OnShutdown func(ctx context.Context) error + options Options + authenticator auth.Authenticator + authorizer auth.Authorizer initTaskMutex sync.Mutex // +checklocks:initTaskMutex initRepositoryTaskID string // non-empty - repository is currently being opened. serverMutex sync.RWMutex + + parallelSnapshotsMutex sync.Mutex + + // +checklocks:parallelSnapshotsMutex + parallelSnapshotsChanged *sync.Cond // condition triggered on change to currentParallelSnapshots or maxParallelSnapshots + + // +checklocks:parallelSnapshotsMutex + currentParallelSnapshots int + // +checklocks:parallelSnapshotsMutex + maxParallelSnapshots int + // +checklocks:serverMutex rep repo.Repository // +checklocks:serverMutex @@ -476,23 +485,44 @@ func (s *Server) requestShutdown(ctx context.Context) { } } +func (s *Server) setMaxParallelSnapshotsLocked(max int) { + s.parallelSnapshotsMutex.Lock() + defer s.parallelSnapshotsMutex.Unlock() + + s.maxParallelSnapshots = max + s.parallelSnapshotsChanged.Broadcast() +} + func (s *Server) beginUpload(ctx context.Context, src snapshot.SourceInfo) bool { - log(ctx).Debugf("waiting on semaphore to upload %v", src) + s.parallelSnapshotsMutex.Lock() + defer s.parallelSnapshotsMutex.Unlock() - if err := s.uploadSemaphore.Acquire(ctx, 1); err != nil { - log(ctx).Debugf("error acquiring semaphore to upload %v: %v", src, err) + for s.currentParallelSnapshots >= s.maxParallelSnapshots && ctx.Err() == nil { + log(ctx).Debugf("waiting on for parallel snapshot upload slot to be available %v", src) + s.parallelSnapshotsChanged.Wait() + } + if ctx.Err() != nil { + // context closed return false } - log(ctx).Debugf("entered semaphore to upload %v", src) + // at this point s.currentParallelSnapshots < s.maxParallelSnapshots and we are locked + s.currentParallelSnapshots++ return true } func (s *Server) endUpload(ctx context.Context, src snapshot.SourceInfo) { + s.parallelSnapshotsMutex.Lock() + defer s.parallelSnapshotsMutex.Unlock() + log(ctx).Debugf("finished uploading %v", src) - s.uploadSemaphore.Release(1) + + s.currentParallelSnapshots-- + + // notify one of the waiters + s.parallelSnapshotsChanged.Signal() } func (s *Server) triggerRefreshSource(sourceInfo snapshot.SourceInfo) { @@ -662,6 +692,18 @@ func (s *Server) syncSourcesLocked(ctx context.Context) error { return errors.Wrap(err, "unable to list sources") } + // user@host policy + userhostPol, _, _, err := policy.GetEffectivePolicy(ctx, s.rep, snapshot.SourceInfo{ + UserName: s.rep.ClientOptions().Username, + Host: s.rep.ClientOptions().Hostname, + }) + + s.setMaxParallelSnapshotsLocked(userhostPol.UploadPolicy.MaxParallelSnapshots.OrDefault(1)) + + if err != nil { + return errors.Wrap(err, "unable to get user policy") + } + for _, ss := range snapshotSources { sources[ss] = true } @@ -995,7 +1037,7 @@ func New(ctx context.Context, options *Options) (*Server, error) { s := &Server{ options: *options, sourceManagers: map[snapshot.SourceInfo]*sourceManager{}, - uploadSemaphore: semaphore.NewWeighted(1), + maxParallelSnapshots: 1, grpcServerState: makeGRPCServerState(options.MaxConcurrency), authenticator: options.Authenticator, authorizer: options.Authorizer, @@ -1004,5 +1046,7 @@ func New(ctx context.Context, options *Options) (*Server, error) { authCookieSigningKey: []byte(options.AuthCookieSigningKey), } + s.parallelSnapshotsChanged = sync.NewCond(&s.parallelSnapshotsMutex) + return s, nil } diff --git a/snapshot/policy/policy.go b/snapshot/policy/policy.go index 328e70bdc..ce4512c09 100644 --- a/snapshot/policy/policy.go +++ b/snapshot/policy/policy.go @@ -29,6 +29,7 @@ type Policy struct { CompressionPolicy CompressionPolicy `json:"compression,omitempty"` Actions ActionsPolicy `json:"actions,omitempty"` LoggingPolicy LoggingPolicy `json:"logging,omitempty"` + UploadPolicy UploadPolicy `json:"upload,omitempty"` NoParent bool `json:"noParent,omitempty"` } @@ -42,6 +43,7 @@ type Definition struct { CompressionPolicy CompressionPolicyDefinition `json:"compression,omitempty"` Actions ActionsPolicyDefinition `json:"actions,omitempty"` LoggingPolicy LoggingPolicyDefinition `json:"logging,omitempty"` + UploadPolicy UploadPolicyDefinition `json:"upload,omitempty"` } func (p *Policy) String() string { @@ -73,8 +75,16 @@ func (p *Policy) Target() snapshot.SourceInfo { // ValidatePolicy returns error if the given policy is invalid. // Currently, only SchedulingPolicy is validated. -func ValidatePolicy(pol *Policy) error { - return ValidateSchedulingPolicy(pol.SchedulingPolicy) +func ValidatePolicy(si snapshot.SourceInfo, pol *Policy) error { + if err := ValidateSchedulingPolicy(pol.SchedulingPolicy); err != nil { + return errors.Wrap(err, "invalid scheduling policy") + } + + if err := ValidateUploadPolicy(si, pol.UploadPolicy); err != nil { + return errors.Wrap(err, "invalid upload policy") + } + + return nil } // validatePolicyPath validates that the provided policy path is valid and the path exists. diff --git a/snapshot/policy/policy_manager.go b/snapshot/policy/policy_manager.go index e2d7b85b6..5e1ea485a 100644 --- a/snapshot/policy/policy_manager.go +++ b/snapshot/policy/policy_manager.go @@ -158,7 +158,7 @@ func GetDefinedPolicy(ctx context.Context, rep repo.Repository, si snapshot.Sour // SetPolicy sets the policy on a given source. func SetPolicy(ctx context.Context, rep repo.RepositoryWriter, si snapshot.SourceInfo, pol *Policy) error { - if err := ValidatePolicy(pol); err != nil { + if err := ValidatePolicy(si, pol); err != nil { return errors.Wrap(err, "failed to validate policy") } diff --git a/snapshot/policy/policy_merge.go b/snapshot/policy/policy_merge.go index 0b4d062d1..e3bb060bf 100644 --- a/snapshot/policy/policy_merge.go +++ b/snapshot/policy/policy_merge.go @@ -22,6 +22,7 @@ func MergePolicies(policies []*Policy, si snapshot.SourceInfo) (*Policy, *Defini merged.FilesPolicy.Merge(p.FilesPolicy, &def.FilesPolicy, p.Target()) merged.ErrorHandlingPolicy.Merge(p.ErrorHandlingPolicy, &def.ErrorHandlingPolicy, p.Target()) merged.SchedulingPolicy.Merge(p.SchedulingPolicy, &def.SchedulingPolicy, p.Target()) + merged.UploadPolicy.Merge(p.UploadPolicy, &def.UploadPolicy, p.Target()) merged.CompressionPolicy.Merge(p.CompressionPolicy, &def.CompressionPolicy, p.Target()) merged.Actions.Merge(p.Actions, &def.Actions, p.Target()) merged.LoggingPolicy.Merge(p.LoggingPolicy, &def.LoggingPolicy, p.Target()) @@ -36,6 +37,7 @@ func MergePolicies(policies []*Policy, si snapshot.SourceInfo) (*Policy, *Defini merged.FilesPolicy.Merge(defaultFilesPolicy, &def.FilesPolicy, GlobalPolicySourceInfo) merged.ErrorHandlingPolicy.Merge(defaultErrorHandlingPolicy, &def.ErrorHandlingPolicy, GlobalPolicySourceInfo) merged.SchedulingPolicy.Merge(defaultSchedulingPolicy, &def.SchedulingPolicy, GlobalPolicySourceInfo) + merged.UploadPolicy.Merge(defaultUploadPolicy, &def.UploadPolicy, GlobalPolicySourceInfo) merged.CompressionPolicy.Merge(defaultCompressionPolicy, &def.CompressionPolicy, GlobalPolicySourceInfo) merged.Actions.Merge(defaultActionsPolicy, &def.Actions, GlobalPolicySourceInfo) merged.LoggingPolicy.Merge(defaultLoggingPolicy, &def.LoggingPolicy, GlobalPolicySourceInfo) diff --git a/snapshot/policy/policy_tree.go b/snapshot/policy/policy_tree.go index a83989a62..ae800de33 100644 --- a/snapshot/policy/policy_tree.go +++ b/snapshot/policy/policy_tree.go @@ -50,6 +50,11 @@ defaultSchedulingPolicy = SchedulingPolicy{} + defaultUploadPolicy = UploadPolicy{ + MaxParallelSnapshots: newOptionalInt(1), + MaxParallelFileReads: nil, // defaults to runtime.NumCPUs() + } + // DefaultPolicy is a default policy returned by policy tree in absence of other policies. DefaultPolicy = &Policy{ FilesPolicy: defaultFilesPolicy, @@ -59,6 +64,7 @@ SchedulingPolicy: defaultSchedulingPolicy, LoggingPolicy: defaultLoggingPolicy, Actions: defaultActionsPolicy, + UploadPolicy: defaultUploadPolicy, } // DefaultDefinition provides the Definition for the default policy. diff --git a/snapshot/policy/upload_policy.go b/snapshot/policy/upload_policy.go new file mode 100644 index 000000000..5b322e045 --- /dev/null +++ b/snapshot/policy/upload_policy.go @@ -0,0 +1,34 @@ +package policy + +import ( + "github.com/pkg/errors" + + "github.com/kopia/kopia/snapshot" +) + +// UploadPolicy describes policy to apply when uploading snapshots. +type UploadPolicy struct { + MaxParallelSnapshots *OptionalInt `json:"maxParallelSnapshots,omitempty"` + MaxParallelFileReads *OptionalInt `json:"maxParallelFileReads,omitempty"` +} + +// UploadPolicyDefinition specifies which policy definition provided the value of a particular field. +type UploadPolicyDefinition struct { + MaxParallelSnapshots snapshot.SourceInfo `json:"maxParallelSnapshots,omitempty"` + MaxParallelFileReads snapshot.SourceInfo `json:"maxParallelFileReads,omitempty"` +} + +// Merge applies default values from the provided policy. +func (p *UploadPolicy) Merge(src UploadPolicy, def *UploadPolicyDefinition, si snapshot.SourceInfo) { + mergeOptionalInt(&p.MaxParallelSnapshots, src.MaxParallelSnapshots, &def.MaxParallelSnapshots, si) + mergeOptionalInt(&p.MaxParallelFileReads, src.MaxParallelFileReads, &def.MaxParallelFileReads, si) +} + +// ValidateUploadPolicy returns an error if manual field is set along with Upload fields. +func ValidateUploadPolicy(si snapshot.SourceInfo, p UploadPolicy) error { + if si.Path != "" && p.MaxParallelSnapshots != nil { + return errors.Errorf("max parallel snapshots cannot be specified for paths, only global, username@hostname or @hostname") + } + + return nil +} diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index c8180a7bc..25b27d81b 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -376,7 +376,7 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) { // uploadFileWithCheckpointing uploads the specified File to the repository. func (u *Uploader) uploadFileWithCheckpointing(ctx context.Context, relativePath string, file fs.File, pol *policy.Policy, sourceInfo snapshot.SourceInfo) (*snapshot.DirEntry, error) { - par := u.effectiveParallelUploads() + par := u.effectiveParallelFileReads(pol) if par == 1 { par = 0 } @@ -825,10 +825,12 @@ func (u *Uploader) maybeIgnoreCachedEntry(ctx context.Context, ent fs.Entry) fs. return nil } -func (u *Uploader) effectiveParallelUploads() int { +func (u *Uploader) effectiveParallelFileReads(pol *policy.Policy) int { p := u.ParallelUploads - if p == 0 { - p = runtime.NumCPU() + max := pol.UploadPolicy.MaxParallelFileReads.OrDefault(runtime.NumCPU()) + + if p < 1 || p > max { + return max } return p @@ -845,7 +847,7 @@ func (u *Uploader) processNonDirectories( prevEntries []fs.Entries, wg *workshare.AsyncGroup, ) error { - workerCount := u.effectiveParallelUploads() + workerCount := u.effectiveParallelFileReads(policyTree.EffectivePolicy()) var asyncWritesPerFile int @@ -1236,7 +1238,6 @@ func NewUploader(r repo.RepositoryWriter) *Uploader { return &Uploader{ repo: r, Progress: &NullUploadProgress{}, - ParallelUploads: 1, EnableActions: r.ClientOptions().EnableActions, CheckpointInterval: DefaultCheckpointInterval, getTicker: time.Tick, @@ -1273,17 +1274,18 @@ func (u *Uploader) Upload( sourceInfo snapshot.SourceInfo, previousManifests ...*snapshot.Manifest, ) (*snapshot.Manifest, error) { - uploadLog(ctx).Debugf("Uploading %v", sourceInfo) + u.Progress.UploadStarted() + defer u.Progress.UploadFinished() + + parallel := u.effectiveParallelFileReads(policyTree.EffectivePolicy()) + + uploadLog(ctx).Debugf("Uploading %v with parallelism %v", sourceInfo, parallel) s := &snapshot.Manifest{ Source: sourceInfo, } - u.Progress.UploadStarted() - - defer u.Progress.UploadFinished() - - u.workerPool = workshare.NewPool(u.effectiveParallelUploads() - 1) + u.workerPool = workshare.NewPool(parallel - 1) defer u.workerPool.Close() u.stats = &snapshot.Stats{}