Add tracing to policies service. (#7116)

This commit is contained in:
Daniel Swärd
2023-08-25 20:01:10 +02:00
committed by GitHub
parent 0eb4b5dc56
commit a5ac3e7ce7
5 changed files with 111 additions and 48 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
svcProtogen "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/policies/v0"
"github.com/owncloud/ocis/v2/services/policies/pkg/config"
@@ -50,13 +51,23 @@ func Server(cfg *config.Config) *cli.Command {
)
defer cancel()
traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
if err != nil {
return err
}
e, err := opa.NewOPA(cfg.Engine.Timeout, logger, cfg.Engine)
if err != nil {
return err
}
{
grpcClient, err := grpc.NewClient(grpc.GetClientOptions(cfg.GRPCClientTLS)...)
grpcClient, err := grpc.NewClient(
append(
grpc.GetClientOptions(cfg.GRPCClientTLS),
grpc.WithTraceProvider(traceProvider),
)...,
)
if err != nil {
return err
}
@@ -74,6 +85,7 @@ func Server(cfg *config.Config) *cli.Command {
grpc.Address(cfg.GRPC.Addr),
grpc.Namespace(cfg.GRPC.Namespace),
grpc.Version(version.GetString()),
grpc.TraceProvider(traceProvider),
)
if err != nil {
return err
@@ -103,7 +115,7 @@ func Server(cfg *config.Config) *cli.Command {
return err
}
eventSvc, err := svcEvent.New(bus, logger, e, cfg.Postprocessing.Query)
eventSvc, err := svcEvent.New(ctx, bus, logger, traceProvider, e, cfg.Postprocessing.Query)
if err != nil {
return err
}

View File

@@ -22,6 +22,7 @@ type Config struct {
Log *Log `yaml:"log"`
Engine Engine `yaml:"engine"`
Postprocessing Postprocessing `yaml:"postprocessing"`
Tracing *Tracing `yaml:"tracing"`
}
// Service defines the available service configuration.

View File

@@ -82,6 +82,18 @@ func EnsureDefaults(cfg *config.Config) {
if cfg.GRPC.TLS == nil && cfg.Commons != nil {
cfg.GRPC.TLS = structs.CopyOrZeroValue(cfg.Commons.GRPCServiceTLS)
}
// provide with defaults for shared tracing, since we need a valid destination address for "envdecode".
if cfg.Tracing == nil && cfg.Commons != nil && cfg.Commons.Tracing != nil {
cfg.Tracing = &config.Tracing{
Enabled: cfg.Commons.Tracing.Enabled,
Type: cfg.Commons.Tracing.Type,
Endpoint: cfg.Commons.Tracing.Endpoint,
Collector: cfg.Commons.Tracing.Collector,
}
} else if cfg.Tracing == nil {
cfg.Tracing = &config.Tracing{}
}
}
func Sanitize(_ *config.Config) {}

View File

@@ -0,0 +1,21 @@
package config
import "github.com/owncloud/ocis/v2/ocis-pkg/tracing"
// Tracing defines the available tracing configuration.
type Tracing struct {
Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;POLICIES_TRACING_ENABLED" desc:"Activates tracing."`
Type string `yaml:"type" env:"OCIS_TRACING_TYPE;POLICIES_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."`
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;POLICIES_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;POLICIES_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
}
// Convert Tracing to the tracing package's Config struct.
func (t Tracing) Convert() tracing.Config {
return tracing.Config{
Enabled: t.Enabled,
Type: t.Type,
Endpoint: t.Endpoint,
Collector: t.Collector,
}
}

View File

@@ -6,21 +6,26 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/policies/pkg/engine"
"go.opentelemetry.io/otel/trace"
)
// Service defines the service handlers.
type Service struct {
ctx context.Context
query string
log log.Logger
stream events.Stream
engine engine.Engine
tp trace.TracerProvider
}
// New returns a service implementation for Service.
func New(stream events.Stream, logger log.Logger, engine engine.Engine, query string) (Service, error) {
func New(ctx context.Context, stream events.Stream, logger log.Logger, tp trace.TracerProvider, engine engine.Engine, query string) (Service, error) {
svc := Service{
ctx: ctx,
log: logger,
query: query,
tp: tp,
engine: engine,
stream: stream,
}
@@ -36,53 +41,65 @@ func (s Service) Run() error {
}
for e := range ch {
switch ev := e.Event.(type) {
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepPolicies {
continue
}
outcome := events.PPOutcomeContinue
if s.query != "" {
env := engine.Environment{
Stage: engine.StagePP,
Resource: engine.Resource{
Name: ev.Filename,
URL: ev.URL,
Size: ev.Filesize,
},
}
if ev.ExecutingUser != nil {
env.User = *ev.ExecutingUser
}
if ev.ResourceID != nil {
env.Resource.ID = *ev.ResourceID
}
result, err := s.engine.Evaluate(context.TODO(), s.query, env)
if err != nil {
s.log.Error().Err(err).Msg("unable evaluate policy")
}
if !result {
outcome = events.PPOutcomeDelete
}
}
if err := events.Publish(context.Background(), s.stream, events.PostprocessingStepFinished{
Outcome: outcome,
UploadID: ev.UploadID,
ExecutingUser: ev.ExecutingUser,
Filename: ev.Filename,
FinishedStep: ev.StepToStart,
}); err != nil {
return err
}
err := s.processEvent(e)
if err != nil {
return err
}
}
return nil
}
func (s Service) processEvent(e events.Event) error {
ctx := e.GetTraceContext(s.ctx)
ctx, span := s.tp.Tracer("policies").Start(ctx, "processEvent")
defer span.End()
switch ev := e.Event.(type) {
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepPolicies {
return nil
}
outcome := events.PPOutcomeContinue
if s.query != "" {
env := engine.Environment{
Stage: engine.StagePP,
Resource: engine.Resource{
Name: ev.Filename,
URL: ev.URL,
Size: ev.Filesize,
},
}
if ev.ExecutingUser != nil {
env.User = *ev.ExecutingUser
}
if ev.ResourceID != nil {
env.Resource.ID = *ev.ResourceID
}
result, err := s.engine.Evaluate(context.TODO(), s.query, env)
if err != nil {
s.log.Error().Err(err).Msg("unable evaluate policy")
}
if !result {
outcome = events.PPOutcomeDelete
}
}
if err := events.Publish(ctx, s.stream, events.PostprocessingStepFinished{
Outcome: outcome,
UploadID: ev.UploadID,
ExecutingUser: ev.ExecutingUser,
Filename: ev.Filename,
FinishedStep: ev.StepToStart,
}); err != nil {
return err
}
}
return nil
}