From 72e3e54fe05c5d766d8fa72c7fae5104e3cde874 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 22 Oct 2024 10:04:56 +0200 Subject: [PATCH 1/3] make delay only affect the step MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../postprocessing/pkg/postprocessing/postprocessing.go | 9 ++++++--- services/postprocessing/pkg/service/service.go | 6 +++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index 1ce3962357..f5274e9875 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -75,9 +75,12 @@ func (pp *Postprocessing) CurrentStep() interface{} { } // Delay will sleep the configured time then continue -func (pp *Postprocessing) Delay() interface{} { - time.Sleep(pp.config.Delayprocessing) - return pp.next(events.PPStepDelay) +func (pp *Postprocessing) Delay(f func(next interface{})) { + next := pp.next(events.PPStepDelay) + go func() { + time.Sleep(pp.config.Delayprocessing) + f(next) + }() } // BackoffDuration calculates the duration for exponential backoff based on the number of failures. diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 13441567d9..ec29966ac7 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -149,7 +149,11 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") return fmt.Errorf("%w: cannot get upload", ErrEvent) } - next = pp.Delay() + pp.Delay(func(next interface{}) { + if err := events.Publish(ctx, pps.pub, next); err != nil { + pps.log.Error().Err(err).Msg("cannot publish event") + } + }) case events.UploadReady: if ev.Failed { // the upload failed - let's keep it around for a while - but mark it as finished From d6958f3a3eda3d27b8828a2d6cee6c51936b7c1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 22 Oct 2024 10:13:16 +0200 Subject: [PATCH 2/3] introduce event processing workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- services/postprocessing/pkg/config/config.go | 1 + .../pkg/config/defaults/defaultconfig.go | 1 + .../postprocessing/pkg/service/service.go | 27 +++++++++++-------- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/services/postprocessing/pkg/config/config.go b/services/postprocessing/pkg/config/config.go index 70f0723702..3fb9559d81 100644 --- a/services/postprocessing/pkg/config/config.go +++ b/services/postprocessing/pkg/config/config.go @@ -35,6 +35,7 @@ type Postprocessing struct { // Events combines the configuration options for the event bus. type Events struct { + Workers int `yaml:"workers" env:"POSTPROCESSING_EVENTS_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"%%NEXT%%"` Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;POSTPROCESSING_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"pre5.0"` Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;POSTPROCESSING_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system." introductionVersion:"pre5.0"` diff --git a/services/postprocessing/pkg/config/defaults/defaultconfig.go b/services/postprocessing/pkg/config/defaults/defaultconfig.go index 51a9bb6112..a649440b9f 100644 --- a/services/postprocessing/pkg/config/defaults/defaultconfig.go +++ b/services/postprocessing/pkg/config/defaults/defaultconfig.go @@ -28,6 +28,7 @@ func DefaultConfig() *config.Config { }, Postprocessing: config.Postprocessing{ Events: config.Events{ + Workers: 3, Endpoint: "127.0.0.1:9233", Cluster: "ocis-cluster", }, diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index ec29966ac7..1bc9065b0a 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -65,18 +65,23 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger // Run to fulfil Runner interface func (pps *PostprocessingService) Run() error { - for e := range pps.events { - err := pps.processEvent(e) - if err != nil { - switch { - case errors.Is(err, ErrFatal): - return err - case errors.Is(err, ErrEvent): - continue - default: - pps.log.Fatal().Err(err).Msg("unknown error - exiting") + // Spawn workers that'll concurrently work the queue + for i := 0; i < 3; i++ { + go (func() { + for e := range pps.events { + err := pps.processEvent(e) + if err != nil { + switch { + case errors.Is(err, ErrFatal): + pps.log.Fatal().Err(err).Msg("fatal error - exiting") + case errors.Is(err, ErrEvent): + pps.log.Error().Err(err).Msg("continuing") + default: + pps.log.Fatal().Err(err).Msg("unknown error - exiting") + } + } } - } + })() } return nil } From 91dc7699d8720e8b2a9a5efe61d701ffcd630807 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 22 Oct 2024 10:21:02 +0200 Subject: [PATCH 3/3] postprocessing event workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- changelog/unreleased/postprocessing-events.md | 5 +++++ services/postprocessing/pkg/config/config.go | 5 +++-- services/postprocessing/pkg/config/defaults/defaultconfig.go | 2 +- services/postprocessing/pkg/service/service.go | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) create mode 100644 changelog/unreleased/postprocessing-events.md diff --git a/changelog/unreleased/postprocessing-events.md b/changelog/unreleased/postprocessing-events.md new file mode 100644 index 0000000000..7b32efcc31 --- /dev/null +++ b/changelog/unreleased/postprocessing-events.md @@ -0,0 +1,5 @@ +Bugfix: increase event processing workers + +We increased the number of go routines that pull events from the queue to three and made the number off workers configurable. Furthermore, the postprocessing delay no longer introduces a sleep that slows down pulling of events, but asynchronously triggers the next step. + +https://github.com/owncloud/ocis/pull/10368 diff --git a/services/postprocessing/pkg/config/config.go b/services/postprocessing/pkg/config/config.go index 3fb9559d81..8919df2f7a 100644 --- a/services/postprocessing/pkg/config/config.go +++ b/services/postprocessing/pkg/config/config.go @@ -25,7 +25,9 @@ type Config struct { // Postprocessing defines the config options for the postprocessing service. type Postprocessing struct { - Events Events `yaml:"events"` + Events Events `yaml:"events"` + Workers int `yaml:"workers" env:"POSTPROCESSING_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"6.7"` + Steps []string `yaml:"steps" env:"POSTPROCESSING_STEPS" desc:"A list of postprocessing steps processed in order of their appearance. Currently supported values by the system are: 'virusscan', 'policies' and 'delay'. Custom steps are allowed. See the documentation for instructions. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"` Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"After uploading a file but before making it available for download, a delay step can be added. Intended for developing purposes only. If a duration is set but the keyword 'delay' is not explicitely added to 'POSTPROCESSING_STEPS', the delay step will be processed as last step. In such a case, a log entry will be written on service startup to remind the admin about that situation. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"` @@ -35,7 +37,6 @@ type Postprocessing struct { // Events combines the configuration options for the event bus. type Events struct { - Workers int `yaml:"workers" env:"POSTPROCESSING_EVENTS_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"%%NEXT%%"` Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;POSTPROCESSING_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"pre5.0"` Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;POSTPROCESSING_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system." introductionVersion:"pre5.0"` diff --git a/services/postprocessing/pkg/config/defaults/defaultconfig.go b/services/postprocessing/pkg/config/defaults/defaultconfig.go index a649440b9f..4f00a1ff9e 100644 --- a/services/postprocessing/pkg/config/defaults/defaultconfig.go +++ b/services/postprocessing/pkg/config/defaults/defaultconfig.go @@ -28,10 +28,10 @@ func DefaultConfig() *config.Config { }, Postprocessing: config.Postprocessing{ Events: config.Events{ - Workers: 3, Endpoint: "127.0.0.1:9233", Cluster: "ocis-cluster", }, + Workers: 3, RetryBackoffDuration: 5 * time.Second, MaxRetries: 14, }, diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 1bc9065b0a..70918e8108 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -66,7 +66,7 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger // Run to fulfil Runner interface func (pps *PostprocessingService) Run() error { // Spawn workers that'll concurrently work the queue - for i := 0; i < 3; i++ { + for i := 0; i < pps.c.Workers; i++ { go (func() { for e := range pps.events { err := pps.processEvent(e)