Merge pull request #10385 from owncloud/fix-postprocessing

fix postprocessing events
This commit is contained in:
Jörn Friedrich Dreyer
2024-10-22 17:08:12 +02:00
committed by GitHub
2 changed files with 11 additions and 5 deletions

View File

@@ -2,4 +2,5 @@ Bugfix: increase event processing workers
We increased the number of go routines that pull events from the queue to three and made the number off workers configurable. Furthermore, the postprocessing delay no longer introduces a sleep that slows down pulling of events, but asynchronously triggers the next step.
https://github.com/owncloud/ocis/pull/10385
https://github.com/owncloud/ocis/pull/10368

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
@@ -65,12 +66,14 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
// Spawn workers that'll concurrently work the queue
wg := sync.WaitGroup{}
for i := 0; i < pps.c.Workers; i++ {
go (func() {
wg.Add(1)
go func() {
defer wg.Done()
for e := range pps.events {
err := pps.processEvent(e)
if err != nil {
if err := pps.processEvent(e); err != nil {
switch {
case errors.Is(err, ErrFatal):
pps.log.Fatal().Err(err).Msg("fatal error - exiting")
@@ -81,8 +84,10 @@ func (pps *PostprocessingService) Run() error {
}
}
}
})()
}()
}
wg.Wait()
return nil
}