From 97034f4aaac7677bd724bdc31b2d7561a7775af5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Franke?= Date: Wed, 23 Aug 2023 15:52:26 +0200 Subject: [PATCH] Add tracing to postprocessing service (#7094) This adds tracing to the postprocessing service. --- services/postprocessing/pkg/command/server.go | 8 +- services/postprocessing/pkg/config/config.go | 8 - services/postprocessing/pkg/config/tracing.go | 21 +++ .../postprocessing/pkg/service/service.go | 156 +++++++++++------- 4 files changed, 120 insertions(+), 73 deletions(-) create mode 100644 services/postprocessing/pkg/config/tracing.go diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go index bf6f15f5b8..a32716f405 100644 --- a/services/postprocessing/pkg/command/server.go +++ b/services/postprocessing/pkg/command/server.go @@ -10,6 +10,7 @@ import ( "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" + "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser" @@ -47,6 +48,11 @@ func Server(cfg *config.Config) *cli.Command { ) defer cancel() + traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) + if err != nil { + return err + } + { bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events)) if err != nil { @@ -62,7 +68,7 @@ func Server(cfg *config.Config) *cli.Command { microstore.Table(cfg.Store.Table), ) - svc, err := service.NewPostprocessingService(bus, logger, st, cfg.Postprocessing) + svc, err := service.NewPostprocessingService(ctx, bus, logger, st, traceProvider, cfg.Postprocessing) if err != nil { return err } diff --git a/services/postprocessing/pkg/config/config.go b/services/postprocessing/pkg/config/config.go index 9377394643..993f5a1b56 100644 --- a/services/postprocessing/pkg/config/config.go +++ b/services/postprocessing/pkg/config/config.go @@ -48,14 +48,6 @@ type Debug struct { Zpages bool `yaml:"zpages" env:"POSTPROCESSING_DEBUG_ZPAGES" desc:"Enables zpages, which can be used for collecting and viewing in-memory traces."` } -// Tracing defines the available tracing configuration. -type Tracing struct { - Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;POSTPROCESSING_TRACING_ENABLED" desc:"Activates tracing."` - Type string `yaml:"type" env:"OCIS_TRACING_TYPE;POSTPROCESSING_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."` - Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;POSTPROCESSING_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."` - Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;POSTPROCESSING_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."` -} - // Store configures the store to use type Store struct { Store string `yaml:"store" env:"OCIS_PERSISTENT_STORE;POSTPROCESSING_STORE" desc:"The type of the store. Supported values are: 'memory', 'ocmem', 'etcd', 'redis', 'redis-sentinel', 'nats-js', 'noop'. See the text description for details."` diff --git a/services/postprocessing/pkg/config/tracing.go b/services/postprocessing/pkg/config/tracing.go new file mode 100644 index 0000000000..9f6e61270a --- /dev/null +++ b/services/postprocessing/pkg/config/tracing.go @@ -0,0 +1,21 @@ +package config + +import "github.com/owncloud/ocis/v2/ocis-pkg/tracing" + +// Tracing defines the available tracing configuration. +type Tracing struct { + Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;POSTPROCESSING_TRACING_ENABLED" desc:"Activates tracing."` + Type string `yaml:"type" env:"OCIS_TRACING_TYPE;POSTPROCESSING_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."` + Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;POSTPROCESSING_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."` + Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;POSTPROCESSING_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."` +} + +// Convert Tracing to the tracing package's Config struct. +func (t Tracing) Convert() tracing.Config { + return tracing.Config{ + Enabled: t.Enabled, + Type: t.Type, + Endpoint: t.Endpoint, + Collector: t.Collector, + } +} diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 01c264dbd8..032d2c8b16 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -3,6 +3,7 @@ package service import ( "context" "encoding/json" + "errors" "fmt" "github.com/cs3org/reva/v2/pkg/events" @@ -10,20 +11,30 @@ import ( "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" "github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing" "go-micro.dev/v4/store" + "go.opentelemetry.io/otel/trace" ) // PostprocessingService is an instance of the service handling postprocessing of files type PostprocessingService struct { + ctx context.Context log log.Logger events <-chan events.Event pub events.Publisher steps []events.Postprocessingstep store store.Store c config.Postprocessing + tp trace.TracerProvider } +var ( + // errFatal is returned when a fatal error occurs and we want to exit. + errFatal = errors.New("fatal error") + // ErrEvent is returned when something went wrong with a specific event. + errEvent = errors.New("event error") +) + // NewPostprocessingService returns a new instance of a postprocessing service -func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store.Store, c config.Postprocessing) (*PostprocessingService, error) { +func NewPostprocessingService(ctx context.Context, stream events.Stream, logger log.Logger, sto store.Store, tp trace.TracerProvider, c config.Postprocessing) (*PostprocessingService, error) { evs, err := events.Consume(stream, "postprocessing", events.BytesReceived{}, events.StartPostprocessingStep{}, @@ -36,89 +47,106 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store } return &PostprocessingService{ + ctx: ctx, log: logger, events: evs, pub: stream, steps: getSteps(c), store: sto, c: c, + tp: tp, }, nil } // Run to fulfil Runner interface func (pps *PostprocessingService) Run() error { - ctx := context.Background() for e := range pps.events { - var ( - next interface{} - pp *postprocessing.Postprocessing - err error - ) + err := pps.processEvent(e) + if err != nil { + switch { + case errors.Is(err, errFatal): + return err + case errors.Is(err, errEvent): + continue + default: + pps.log.Fatal().Err(err).Msg("unknown error - exiting") + } + } + } + return nil +} - ctx = e.GetTraceContext(ctx) +func (pps *PostprocessingService) processEvent(e events.Event) error { + var ( + next interface{} + pp *postprocessing.Postprocessing + err error + ) - switch ev := e.Event.(type) { - case events.BytesReceived: - pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) - next = pp.Init(ev) - case events.PostprocessingStepFinished: - if ev.UploadID == "" { - // no current upload - this was an on demand scan - continue - } - pp, err = getPP(pps.store, ev.UploadID) - if err != nil { - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") - continue - } - next = pp.NextStep(ev) - case events.StartPostprocessingStep: - if ev.StepToStart != events.PPStepDelay { - continue - } - pp, err = getPP(pps.store, ev.UploadID) - if err != nil { - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") - continue - } - next = pp.Delay(ev) - case events.UploadReady: - // the storage provider thinks the upload is done - so no need to keep it any more - if err := pps.store.Delete(ev.UploadID); err != nil { - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") - continue - } - case events.ResumePostprocessing: - pp, err = getPP(pps.store, ev.UploadID) - if err != nil { - if err == store.ErrNotFound { - if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{ - UploadID: ev.UploadID, - Timestamp: ev.Timestamp, - }); err != nil { - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event") - } - continue + ctx := e.GetTraceContext(pps.ctx) + ctx, span := pps.tp.Tracer("postprocessing").Start(ctx, "processEvent") + defer span.End() + + switch ev := e.Event.(type) { + case events.BytesReceived: + pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) + next = pp.Init(ev) + case events.PostprocessingStepFinished: + if ev.UploadID == "" { + // no current upload - this was an on demand scan + return nil + } + pp, err = getPP(pps.store, ev.UploadID) + if err != nil { + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") + return fmt.Errorf("%w: cannot get upload", errEvent) + } + next = pp.NextStep(ev) + case events.StartPostprocessingStep: + if ev.StepToStart != events.PPStepDelay { + return nil + } + pp, err = getPP(pps.store, ev.UploadID) + if err != nil { + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") + return fmt.Errorf("%w: cannot get upload", errEvent) + } + next = pp.Delay(ev) + case events.UploadReady: + // the storage provider thinks the upload is done - so no need to keep it any more + if err := pps.store.Delete(ev.UploadID); err != nil { + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") + return fmt.Errorf("%w: cannot delete upload", errEvent) + } + case events.ResumePostprocessing: + pp, err = getPP(pps.store, ev.UploadID) + if err != nil { + if err == store.ErrNotFound { + if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{ + UploadID: ev.UploadID, + Timestamp: ev.Timestamp, + }); err != nil { + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event") } - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") - continue + return fmt.Errorf("%w: cannot publish RestartPostprocessing event", errEvent) } - next = pp.CurrentStep() + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") + return fmt.Errorf("%w: cannot get upload", errEvent) } + next = pp.CurrentStep() + } - if pp != nil { - if err := storePP(pps.store, pp); err != nil { - pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload") - continue // TODO: should we really continue here? - } + if pp != nil { + if err := storePP(pps.store, pp); err != nil { + pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload") + return fmt.Errorf("%w: cannot store upload", errEvent) } - if next != nil { - if err := events.Publish(ctx, pps.pub, next); err != nil { - pps.log.Error().Err(err).Msg("unable to publish event") - return err // we can't publish -> we are screwed - } + } + if next != nil { + if err := events.Publish(ctx, pps.pub, next); err != nil { + pps.log.Error().Err(err).Msg("unable to publish event") + return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed } - } return nil }