diff --git a/changelog/unreleased/postprocessing-events.md b/changelog/unreleased/postprocessing-events.md index 7b32efcc31..86394e3141 100644 --- a/changelog/unreleased/postprocessing-events.md +++ b/changelog/unreleased/postprocessing-events.md @@ -2,4 +2,5 @@ Bugfix: increase event processing workers We increased the number of go routines that pull events from the queue to three and made the number off workers configurable. Furthermore, the postprocessing delay no longer introduces a sleep that slows down pulling of events, but asynchronously triggers the next step. +https://github.com/owncloud/ocis/pull/10385 https://github.com/owncloud/ocis/pull/10368 diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 70918e8108..0628fd73f1 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" @@ -65,12 +66,14 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger // Run to fulfil Runner interface func (pps *PostprocessingService) Run() error { - // Spawn workers that'll concurrently work the queue + wg := sync.WaitGroup{} + for i := 0; i < pps.c.Workers; i++ { - go (func() { + wg.Add(1) + go func() { + defer wg.Done() for e := range pps.events { - err := pps.processEvent(e) - if err != nil { + if err := pps.processEvent(e); err != nil { switch { case errors.Is(err, ErrFatal): pps.log.Fatal().Err(err).Msg("fatal error - exiting") @@ -81,8 +84,10 @@ func (pps *PostprocessingService) Run() error { } } } - })() + }() } + wg.Wait() + return nil }