mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2025-12-27 00:00:49 -05:00
450 lines
13 KiB
Go
450 lines
13 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/opencloud-eu/opencloud/pkg/generators"
|
|
"github.com/opencloud-eu/opencloud/pkg/log"
|
|
"github.com/opencloud-eu/opencloud/pkg/version"
|
|
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config"
|
|
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/metrics"
|
|
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/postprocessing"
|
|
ctxpkg "github.com/opencloud-eu/reva/v2/pkg/ctx"
|
|
"github.com/opencloud-eu/reva/v2/pkg/events"
|
|
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
|
|
"github.com/opencloud-eu/reva/v2/pkg/events/stream"
|
|
"github.com/opencloud-eu/reva/v2/pkg/utils"
|
|
"go-micro.dev/v4/store"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
// PostprocessingService is an instance of the service handling postprocessing of files
|
|
type PostprocessingService struct {
|
|
ctx context.Context
|
|
log log.Logger
|
|
events <-chan raw.Event
|
|
pub events.Publisher
|
|
steps []events.Postprocessingstep
|
|
store store.Store
|
|
c config.Postprocessing
|
|
tp trace.TracerProvider
|
|
metrics *metrics.Metrics
|
|
stopCh chan struct{}
|
|
stopped atomic.Bool
|
|
}
|
|
|
|
var (
|
|
// ErrFatal is returned when a fatal error occurs and we want to exit.
|
|
ErrFatal = errors.New("fatal error")
|
|
// ErrEvent is returned when something went wrong with a specific event.
|
|
ErrEvent = errors.New("event error")
|
|
// ErrNotFound is returned when a postprocessing is not found in the store.
|
|
ErrNotFound = errors.New("postprocessing not found")
|
|
)
|
|
|
|
// NewPostprocessingService returns a new instance of a postprocessing service
|
|
func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.Store, tp trace.TracerProvider, cfg *config.Config) (*PostprocessingService, error) {
|
|
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
|
pub, err := stream.NatsFromConfig(connName, false, stream.NatsConfig{
|
|
Endpoint: cfg.Postprocessing.Events.Endpoint,
|
|
Cluster: cfg.Postprocessing.Events.Cluster,
|
|
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
|
|
TLSInsecure: cfg.Postprocessing.Events.TLSInsecure,
|
|
TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate,
|
|
AuthUsername: cfg.Postprocessing.Events.AuthUsername,
|
|
AuthPassword: cfg.Postprocessing.Events.AuthPassword,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
raw, err := raw.FromConfig(ctx, connName, raw.Config{
|
|
Endpoint: cfg.Postprocessing.Events.Endpoint,
|
|
Cluster: cfg.Postprocessing.Events.Cluster,
|
|
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
|
|
TLSInsecure: cfg.Postprocessing.Events.TLSInsecure,
|
|
TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate,
|
|
AuthUsername: cfg.Postprocessing.Events.AuthUsername,
|
|
AuthPassword: cfg.Postprocessing.Events.AuthPassword,
|
|
MaxAckPending: cfg.Postprocessing.Events.MaxAckPending,
|
|
AckWait: cfg.Postprocessing.Events.AckWait,
|
|
})
|
|
|
|
evs, err := raw.Consume("postprocessing-pull",
|
|
events.BytesReceived{},
|
|
events.StartPostprocessingStep{},
|
|
events.UploadReady{},
|
|
events.PostprocessingStepFinished{},
|
|
events.ResumePostprocessing{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m := metrics.New()
|
|
m.BuildInfo.WithLabelValues(version.GetString()).Set(1)
|
|
monitorMetrics(raw, "postprocessing-pull", m, logger)
|
|
|
|
return &PostprocessingService{
|
|
ctx: ctx,
|
|
log: logger,
|
|
events: evs,
|
|
pub: pub,
|
|
steps: getSteps(cfg.Postprocessing),
|
|
store: sto,
|
|
c: cfg.Postprocessing,
|
|
tp: tp,
|
|
metrics: m,
|
|
stopCh: make(chan struct{}, 1),
|
|
}, nil
|
|
}
|
|
|
|
// Run to fulfil Runner interface
|
|
func (pps *PostprocessingService) Run() error {
|
|
wg := sync.WaitGroup{}
|
|
|
|
for range pps.c.Workers {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
EventLoop:
|
|
for {
|
|
select {
|
|
case <-pps.stopCh:
|
|
// stop requested
|
|
// TODO: we might need a way to unsubscribe from the event channel, otherwise
|
|
// we'll be leaking a goroutine in reva that will be stuck waiting for
|
|
// someone to read from the event channel.
|
|
// Note: redis implementation seems to have a timeout, so the goroutine
|
|
// will exit if there is nobody processing the events and the timeout
|
|
// is reached. The behavior is unclear with natsjs
|
|
break EventLoop
|
|
case e, ok := <-pps.events:
|
|
if !ok {
|
|
// event channel is closed, so nothing more to do
|
|
break EventLoop
|
|
}
|
|
|
|
err := pps.processEvent(e)
|
|
if err != nil {
|
|
switch {
|
|
case errors.Is(err, ErrFatal):
|
|
pps.log.Fatal().Err(err).Msg("fatal error - exiting")
|
|
case errors.Is(err, ErrEvent):
|
|
pps.log.Error().Err(err).Msg("continuing")
|
|
default:
|
|
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
|
|
}
|
|
}
|
|
|
|
if pps.stopped.Load() {
|
|
// if stopped, don't process any more events
|
|
break EventLoop
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close will make the postprocessing service to stop processing, so the `Run`
|
|
// method can finish.
|
|
// TODO: Underlying services can't be stopped. This means that some goroutines
|
|
// will get stuck trying to push events through a channel nobody is reading
|
|
// from, so resources won't be freed and there will be memory leaks. For now,
|
|
// if the service is stopped, you should close the app soon after.
|
|
func (pps *PostprocessingService) Close() {
|
|
if pps.stopped.CompareAndSwap(false, true) {
|
|
close(pps.stopCh)
|
|
}
|
|
}
|
|
|
|
func (pps *PostprocessingService) processEvent(e raw.Event) error {
|
|
pps.log.Debug().Str("Type", e.Type).Str("ID", e.ID).Msg("processing event received")
|
|
|
|
var (
|
|
next interface{}
|
|
pp *postprocessing.Postprocessing
|
|
err error
|
|
)
|
|
|
|
ctx := e.GetTraceContext(pps.ctx)
|
|
ctx, span := pps.tp.Tracer("postprocessing").Start(ctx, "processEvent")
|
|
defer span.End()
|
|
|
|
ackEvent := true
|
|
defer func() {
|
|
if ackEvent {
|
|
if err := e.Ack(); err != nil {
|
|
pps.log.Error().Err(err).Msg("unable to ack event")
|
|
}
|
|
}
|
|
}()
|
|
|
|
switch ev := e.Event.Event.(type) {
|
|
case events.BytesReceived:
|
|
pp = &postprocessing.Postprocessing{
|
|
ID: ev.UploadID,
|
|
URL: ev.URL,
|
|
User: ev.ExecutingUser,
|
|
Filename: ev.Filename,
|
|
Filesize: ev.Filesize,
|
|
ResourceID: ev.ResourceID,
|
|
Steps: pps.steps,
|
|
InitiatorID: e.InitiatorID,
|
|
ImpersonatingUser: ev.ImpersonatingUser,
|
|
StartTime: time.Now(),
|
|
}
|
|
next = pp.Init(ev)
|
|
case events.PostprocessingStepFinished:
|
|
if ev.UploadID == "" {
|
|
// no current upload - this was an on demand scan
|
|
return nil
|
|
}
|
|
pp, err = pps.getPP(pps.store, ev.UploadID)
|
|
if err != nil {
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
|
return fmt.Errorf("%w: cannot get upload", ErrEvent)
|
|
}
|
|
next = pp.NextStep(ev)
|
|
|
|
switch pp.Status.Outcome {
|
|
case events.PPOutcomeRetry:
|
|
// schedule retry
|
|
backoff := pp.BackoffDuration()
|
|
go func() {
|
|
time.Sleep(backoff)
|
|
retryEvent := events.StartPostprocessingStep{
|
|
UploadID: pp.ID,
|
|
URL: pp.URL,
|
|
ExecutingUser: pp.User,
|
|
Filename: pp.Filename,
|
|
Filesize: pp.Filesize,
|
|
ResourceID: pp.ResourceID,
|
|
StepToStart: pp.Status.CurrentStep,
|
|
ImpersonatingUser: pp.ImpersonatingUser,
|
|
}
|
|
err := events.Publish(ctx, pps.pub, retryEvent)
|
|
if err != nil {
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event")
|
|
}
|
|
}()
|
|
}
|
|
case events.StartPostprocessingStep:
|
|
if ev.StepToStart != events.PPStepDelay {
|
|
return nil
|
|
}
|
|
pp, err = pps.getPP(pps.store, ev.UploadID)
|
|
if err != nil {
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
|
return fmt.Errorf("%w: cannot get upload", ErrEvent)
|
|
}
|
|
pp.Delay(func(next interface{}) {
|
|
if err := events.Publish(ctx, pps.pub, next); err != nil {
|
|
pps.log.Error().Err(err).Msg("cannot publish event")
|
|
}
|
|
})
|
|
case events.UploadReady:
|
|
// the upload failed - let's keep it around for a while - but mark it as finished
|
|
pp, err = pps.getPP(pps.store, ev.UploadID)
|
|
if err != nil {
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
|
return fmt.Errorf("%w: cannot get upload", ErrEvent)
|
|
}
|
|
|
|
if ev.Failed {
|
|
pps.metrics.Finished.WithLabelValues("failed").Inc()
|
|
if !pp.StartTime.IsZero() {
|
|
pps.metrics.Duration.WithLabelValues("failed").Observe(time.Since(pp.StartTime).Seconds())
|
|
}
|
|
pp.Finished = true
|
|
return storePP(pps.store, pp)
|
|
}
|
|
|
|
pps.metrics.Finished.WithLabelValues("succeeded").Inc()
|
|
if !pp.StartTime.IsZero() {
|
|
pps.metrics.Duration.WithLabelValues("succeeded").Observe(time.Since(pp.StartTime).Seconds())
|
|
}
|
|
// the storage provider thinks the upload is done - so no need to keep it any more
|
|
if err := pps.store.Delete(ev.UploadID); err != nil {
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
|
|
return fmt.Errorf("%w: cannot delete upload", ErrEvent)
|
|
}
|
|
case events.ResumePostprocessing:
|
|
return pps.handleResumePPEvent(ctx, ev)
|
|
}
|
|
|
|
if pp != nil {
|
|
ctx = ctxpkg.ContextSetInitiator(ctx, pp.InitiatorID)
|
|
|
|
if err := storePP(pps.store, pp); err != nil {
|
|
ackEvent = false
|
|
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
|
|
return fmt.Errorf("%w: cannot store upload", ErrEvent)
|
|
}
|
|
}
|
|
|
|
if next != nil {
|
|
if err := events.Publish(ctx, pps.pub, next); err != nil {
|
|
pps.log.Error().Err(err).Msg("unable to publish event")
|
|
return fmt.Errorf("%w: unable to publish event", ErrFatal) // we can't publish -> we are screwed
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
|
|
recs, err := sto.Read(uploadID)
|
|
if err != nil {
|
|
if err == store.ErrNotFound {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if len(recs) == 0 {
|
|
return nil, ErrNotFound
|
|
}
|
|
|
|
if len(recs) > 1 {
|
|
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
|
|
}
|
|
|
|
pp := postprocessing.New(pps.c)
|
|
err = json.Unmarshal(recs[0].Value, pp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return pp, nil
|
|
}
|
|
|
|
func getSteps(c config.Postprocessing) []events.Postprocessingstep {
|
|
// NOTE: improved version only allows configuring order of postprocessing steps
|
|
// But we aim for a system where postprocessing steps can be configured per space, ideally by the spaceadmin itself
|
|
// We need to iterate over configuring PP service when we see fit
|
|
steps := make([]events.Postprocessingstep, 0, len(c.Steps))
|
|
for _, s := range c.Steps {
|
|
steps = append(steps, events.Postprocessingstep(s))
|
|
}
|
|
|
|
return steps
|
|
}
|
|
|
|
func storePP(sto store.Store, pp *postprocessing.Postprocessing) error {
|
|
b, err := json.Marshal(pp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return sto.Write(&store.Record{
|
|
Key: pp.ID,
|
|
Value: b,
|
|
})
|
|
}
|
|
|
|
func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev events.ResumePostprocessing) error {
|
|
ids := []string{ev.UploadID}
|
|
if ev.Step != "" {
|
|
ids = pps.findUploadsByStep(ev.Step)
|
|
}
|
|
|
|
for _, id := range ids {
|
|
if err := pps.resumePP(ctx, id); err != nil {
|
|
pps.log.Error().Str("uploadID", id).Err(err).Msg("cannot resume upload")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) error {
|
|
pp, err := pps.getPP(pps.store, uploadID)
|
|
if err != nil {
|
|
if err == ErrNotFound {
|
|
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
|
|
UploadID: uploadID,
|
|
Timestamp: utils.TSNow(),
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("cannot get upload: %w", err)
|
|
}
|
|
|
|
if pp.Finished {
|
|
// dont retry finished uploads
|
|
return nil
|
|
}
|
|
|
|
return events.Publish(ctx, pps.pub, pp.CurrentStep())
|
|
}
|
|
|
|
func (pps *PostprocessingService) findUploadsByStep(step events.Postprocessingstep) []string {
|
|
var ids []string
|
|
|
|
keys, err := pps.store.List()
|
|
if err != nil {
|
|
pps.log.Error().Err(err).Msg("cannot list uploads")
|
|
}
|
|
|
|
for _, k := range keys {
|
|
rec, err := pps.store.Read(k)
|
|
if err != nil {
|
|
pps.log.Error().Err(err).Msg("cannot read upload")
|
|
continue
|
|
}
|
|
|
|
if len(rec) != 1 {
|
|
pps.log.Error().Err(err).Msg("expected only one result")
|
|
continue
|
|
}
|
|
|
|
pp := &postprocessing.Postprocessing{}
|
|
err = json.Unmarshal(rec[0].Value, pp)
|
|
if err != nil {
|
|
pps.log.Error().Err(err).Msg("cannot unmarshal upload")
|
|
continue
|
|
}
|
|
|
|
if pp.Status.CurrentStep == step {
|
|
ids = append(ids, pp.ID)
|
|
}
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
|
|
ctx := context.Background()
|
|
consumer, err := stream.JetStream().Consumer(ctx, name)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("failed to get consumer")
|
|
}
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
go func() {
|
|
for range ticker.C {
|
|
info, err := consumer.Info(ctx)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("failed to get consumer")
|
|
continue
|
|
}
|
|
|
|
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
|
|
m.EventsUnprocessed.Set(float64(info.NumPending))
|
|
m.EventsRedelivered.Set(float64(info.NumRedelivered))
|
|
logger.Trace().Msg("updated postprocessing event metrics")
|
|
}
|
|
}()
|
|
}
|