diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index 1ce3962357..f5274e9875 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -75,9 +75,12 @@ func (pp *Postprocessing) CurrentStep() interface{} { } // Delay will sleep the configured time then continue -func (pp *Postprocessing) Delay() interface{} { - time.Sleep(pp.config.Delayprocessing) - return pp.next(events.PPStepDelay) +func (pp *Postprocessing) Delay(f func(next interface{})) { + next := pp.next(events.PPStepDelay) + go func() { + time.Sleep(pp.config.Delayprocessing) + f(next) + }() } // BackoffDuration calculates the duration for exponential backoff based on the number of failures. diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 13441567d9..ec29966ac7 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -149,7 +149,11 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") return fmt.Errorf("%w: cannot get upload", ErrEvent) } - next = pp.Delay() + pp.Delay(func(next interface{}) { + if err := events.Publish(ctx, pps.pub, next); err != nil { + pps.log.Error().Err(err).Msg("cannot publish event") + } + }) case events.UploadReady: if ev.Failed { // the upload failed - let's keep it around for a while - but mark it as finished