From 91dc7699d8720e8b2a9a5efe61d701ffcd630807 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 22 Oct 2024 10:21:02 +0200 Subject: [PATCH] postprocessing event workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- changelog/unreleased/postprocessing-events.md | 5 +++++ services/postprocessing/pkg/config/config.go | 5 +++-- services/postprocessing/pkg/config/defaults/defaultconfig.go | 2 +- services/postprocessing/pkg/service/service.go | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) create mode 100644 changelog/unreleased/postprocessing-events.md diff --git a/changelog/unreleased/postprocessing-events.md b/changelog/unreleased/postprocessing-events.md new file mode 100644 index 0000000000..7b32efcc31 --- /dev/null +++ b/changelog/unreleased/postprocessing-events.md @@ -0,0 +1,5 @@ +Bugfix: increase event processing workers + +We increased the number of go routines that pull events from the queue to three and made the number off workers configurable. Furthermore, the postprocessing delay no longer introduces a sleep that slows down pulling of events, but asynchronously triggers the next step. + +https://github.com/owncloud/ocis/pull/10368 diff --git a/services/postprocessing/pkg/config/config.go b/services/postprocessing/pkg/config/config.go index 3fb9559d81..8919df2f7a 100644 --- a/services/postprocessing/pkg/config/config.go +++ b/services/postprocessing/pkg/config/config.go @@ -25,7 +25,9 @@ type Config struct { // Postprocessing defines the config options for the postprocessing service. type Postprocessing struct { - Events Events `yaml:"events"` + Events Events `yaml:"events"` + Workers int `yaml:"workers" env:"POSTPROCESSING_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"6.7"` + Steps []string `yaml:"steps" env:"POSTPROCESSING_STEPS" desc:"A list of postprocessing steps processed in order of their appearance. Currently supported values by the system are: 'virusscan', 'policies' and 'delay'. Custom steps are allowed. See the documentation for instructions. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"` Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"After uploading a file but before making it available for download, a delay step can be added. Intended for developing purposes only. If a duration is set but the keyword 'delay' is not explicitely added to 'POSTPROCESSING_STEPS', the delay step will be processed as last step. In such a case, a log entry will be written on service startup to remind the admin about that situation. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"` @@ -35,7 +37,6 @@ type Postprocessing struct { // Events combines the configuration options for the event bus. type Events struct { - Workers int `yaml:"workers" env:"POSTPROCESSING_EVENTS_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"%%NEXT%%"` Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;POSTPROCESSING_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"pre5.0"` Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;POSTPROCESSING_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system." introductionVersion:"pre5.0"` diff --git a/services/postprocessing/pkg/config/defaults/defaultconfig.go b/services/postprocessing/pkg/config/defaults/defaultconfig.go index a649440b9f..4f00a1ff9e 100644 --- a/services/postprocessing/pkg/config/defaults/defaultconfig.go +++ b/services/postprocessing/pkg/config/defaults/defaultconfig.go @@ -28,10 +28,10 @@ func DefaultConfig() *config.Config { }, Postprocessing: config.Postprocessing{ Events: config.Events{ - Workers: 3, Endpoint: "127.0.0.1:9233", Cluster: "ocis-cluster", }, + Workers: 3, RetryBackoffDuration: 5 * time.Second, MaxRetries: 14, }, diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 1bc9065b0a..70918e8108 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -66,7 +66,7 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger // Run to fulfil Runner interface func (pps *PostprocessingService) Run() error { // Spawn workers that'll concurrently work the queue - for i := 0; i < 3; i++ { + for i := 0; i < pps.c.Workers; i++ { go (func() { for e := range pps.events { err := pps.processEvent(e)