mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-02 19:20:08 -05:00
Bumps [github.com/open-policy-agent/opa](https://github.com/open-policy-agent/opa) from 1.6.0 to 1.8.0. - [Release notes](https://github.com/open-policy-agent/opa/releases) - [Changelog](https://github.com/open-policy-agent/opa/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-policy-agent/opa/compare/v1.6.0...v1.8.0) --- updated-dependencies: - dependency-name: github.com/open-policy-agent/opa dependency-version: 1.8.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com>
1196 lines
34 KiB
Go
1196 lines
34 KiB
Go
// Copyright 2018 The OPA Authors. All rights reserved.
|
|
// Use of this source code is governed by an Apache2
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Package plugins implements plugin management for the policy engine.
|
|
package plugins
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"maps"
|
|
mr "math/rand"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/open-policy-agent/opa/internal/report"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.opentelemetry.io/otel/sdk/trace"
|
|
|
|
bundleUtils "github.com/open-policy-agent/opa/internal/bundle"
|
|
cfg "github.com/open-policy-agent/opa/internal/config"
|
|
initload "github.com/open-policy-agent/opa/internal/runtime/init"
|
|
"github.com/open-policy-agent/opa/v1/ast"
|
|
"github.com/open-policy-agent/opa/v1/bundle"
|
|
"github.com/open-policy-agent/opa/v1/config"
|
|
"github.com/open-policy-agent/opa/v1/hooks"
|
|
"github.com/open-policy-agent/opa/v1/keys"
|
|
"github.com/open-policy-agent/opa/v1/loader"
|
|
"github.com/open-policy-agent/opa/v1/logging"
|
|
"github.com/open-policy-agent/opa/v1/plugins/rest"
|
|
"github.com/open-policy-agent/opa/v1/resolver/wasm"
|
|
"github.com/open-policy-agent/opa/v1/storage"
|
|
"github.com/open-policy-agent/opa/v1/topdown/cache"
|
|
"github.com/open-policy-agent/opa/v1/topdown/print"
|
|
"github.com/open-policy-agent/opa/v1/tracing"
|
|
)
|
|
|
|
// Factory defines the interface OPA uses to instantiate your plugin.
|
|
//
|
|
// When OPA processes it's configuration it looks for factories that
|
|
// have been registered by calling runtime.RegisterPlugin. Factories
|
|
// are registered to a name which is used to key into the
|
|
// configuration blob. If your plugin has not been configured, your
|
|
// factory will not be invoked.
|
|
//
|
|
// plugins:
|
|
// my_plugin1:
|
|
// some_key: foo
|
|
// # my_plugin2:
|
|
// # some_key2: bar
|
|
//
|
|
// If OPA was started with the configuration above and received two
|
|
// calls to runtime.RegisterPlugins (one with NAME "my_plugin1" and
|
|
// one with NAME "my_plugin2"), it would only invoke the factory for
|
|
// for my_plugin1.
|
|
//
|
|
// OPA instantiates and reconfigures plugins in two steps. First, OPA
|
|
// will call Validate to check the configuration. Assuming the
|
|
// configuration is valid, your factory should return a configuration
|
|
// value that can be used to construct your plugin. Second, OPA will
|
|
// call New to instantiate your plugin providing the configuration
|
|
// value returned from the Validate call.
|
|
//
|
|
// Validate receives a slice of bytes representing plugin
|
|
// configuration and returns a configuration value that can be used to
|
|
// instantiate your plugin. The manager is provided to give access to
|
|
// the OPA's compiler, storage layer, and global configuration. Your
|
|
// Validate function will typically:
|
|
//
|
|
// 1. Deserialize the raw config bytes
|
|
// 2. Validate the deserialized config for semantic errors
|
|
// 3. Inject default values
|
|
// 4. Return a deserialized/parsed config
|
|
//
|
|
// New receives a valid configuration for your plugin and returns a
|
|
// plugin object. Your New function will typically:
|
|
//
|
|
// 1. Cast the config value to it's own type
|
|
// 2. Instantiate a plugin object
|
|
// 3. Return the plugin object
|
|
// 4. Update status via `plugins.Manager#UpdatePluginStatus`
|
|
//
|
|
// After a plugin has been created subsequent status updates can be
|
|
// send anytime the plugin enters a ready or error state.
|
|
type Factory interface {
|
|
Validate(manager *Manager, config []byte) (any, error)
|
|
New(manager *Manager, config any) Plugin
|
|
}
|
|
|
|
// Plugin defines the interface OPA uses to manage your plugin.
|
|
//
|
|
// When OPA starts it will start all of the plugins it was configured
|
|
// to instantiate. Each time a new plugin is configured (via
|
|
// discovery), OPA will start it. You can use the Start call to spawn
|
|
// additional goroutines or perform initialization tasks.
|
|
//
|
|
// Currently OPA will not call Stop on plugins.
|
|
//
|
|
// When OPA receives new configuration for your plugin via discovery
|
|
// it will first Validate the configuration using your factory and
|
|
// then call Reconfigure.
|
|
type Plugin interface {
|
|
Start(ctx context.Context) error
|
|
Stop(ctx context.Context)
|
|
Reconfigure(ctx context.Context, config any)
|
|
}
|
|
|
|
// Triggerable defines the interface plugins use for manual plugin triggers.
|
|
type Triggerable interface {
|
|
Trigger(context.Context) error
|
|
}
|
|
|
|
// State defines the state that a Plugin instance is currently
|
|
// in with pre-defined states.
|
|
type State string
|
|
|
|
const (
|
|
// StateNotReady indicates that the Plugin is not in an error state, but isn't
|
|
// ready for normal operation yet. This should only happen at
|
|
// initialization time.
|
|
StateNotReady State = "NOT_READY"
|
|
|
|
// StateOK signifies that the Plugin is operating normally.
|
|
StateOK State = "OK"
|
|
|
|
// StateErr indicates that the Plugin is in an error state and should not
|
|
// be considered as functional.
|
|
StateErr State = "ERROR"
|
|
|
|
// StateWarn indicates the Plugin is operating, but in a potentially dangerous or
|
|
// degraded state. It may be used to indicate manual remediation is needed, or to
|
|
// alert admins of some other noteworthy state.
|
|
StateWarn State = "WARN"
|
|
)
|
|
|
|
// TriggerMode defines the trigger mode utilized by a Plugin for bundle download,
|
|
// log upload etc.
|
|
type TriggerMode string
|
|
|
|
const (
|
|
// TriggerPeriodic represents periodic polling mechanism
|
|
TriggerPeriodic TriggerMode = "periodic"
|
|
|
|
// TriggerManual represents manual triggering mechanism
|
|
TriggerManual TriggerMode = "manual"
|
|
|
|
// DefaultTriggerMode represents default trigger mechanism
|
|
DefaultTriggerMode TriggerMode = "periodic"
|
|
)
|
|
|
|
// default interval between OPA report uploads
|
|
var defaultUploadIntervalSec = int64(3600)
|
|
|
|
// Status has a Plugin's current status plus an optional Message.
|
|
type Status struct {
|
|
State State `json:"state"`
|
|
Message string `json:"message,omitempty"`
|
|
}
|
|
|
|
func (s *Status) String() string {
|
|
return fmt.Sprintf("{%v %q}", s.State, s.Message)
|
|
}
|
|
|
|
func (s *Status) Equal(other *Status) bool {
|
|
if s == nil || other == nil {
|
|
return s == nil && other == nil
|
|
}
|
|
|
|
return s.State == other.State && s.Message == other.Message
|
|
}
|
|
|
|
// StatusListener defines a handler to register for status updates.
|
|
type StatusListener func(status map[string]*Status)
|
|
|
|
// Manager implements lifecycle management of plugins and gives plugins access
|
|
// to engine-wide components like storage.
|
|
type Manager struct {
|
|
Store storage.Store
|
|
// Config values should be accessed from the thread-safe GetConfig method.
|
|
Config *config.Config
|
|
Info *ast.Term
|
|
ID string
|
|
|
|
compiler *ast.Compiler
|
|
compilerMux sync.RWMutex
|
|
wasmResolvers []*wasm.Resolver
|
|
wasmResolversMtx sync.RWMutex
|
|
services map[string]rest.Client
|
|
keys map[string]*keys.Config
|
|
plugins []namedplugin
|
|
registeredTriggers []func(storage.Transaction)
|
|
mtx sync.Mutex
|
|
pluginStatus map[string]*Status
|
|
pluginStatusListeners map[string]StatusListener
|
|
initBundles map[string]*bundle.Bundle
|
|
initFiles loader.Result
|
|
maxErrors int
|
|
initialized bool
|
|
interQueryBuiltinCacheConfig *cache.Config
|
|
gracefulShutdownPeriod int
|
|
registeredCacheTriggers []func(*cache.Config)
|
|
logger logging.Logger
|
|
consoleLogger logging.Logger
|
|
serverInitialized chan struct{}
|
|
serverInitializedOnce sync.Once
|
|
printHook print.Hook
|
|
enablePrintStatements bool
|
|
router *http.ServeMux
|
|
prometheusRegister prometheus.Registerer
|
|
tracerProvider *trace.TracerProvider
|
|
distributedTacingOpts tracing.Options
|
|
registeredNDCacheTriggers []func(bool)
|
|
registeredTelemetryGatherers map[string]report.Gatherer
|
|
bootstrapConfigLabels map[string]string
|
|
hooks hooks.Hooks
|
|
enableTelemetry bool
|
|
reporter report.Reporter
|
|
opaReportNotifyCh chan struct{}
|
|
stop chan chan struct{}
|
|
parserOptions ast.ParserOptions
|
|
extraRoutes map[string]ExtraRoute
|
|
extraMiddlewares []func(http.Handler) http.Handler
|
|
extraAuthorizerRoutes []func(string, []any) bool
|
|
bundleActivatorPlugin string
|
|
}
|
|
|
|
type (
|
|
managerContextKey string
|
|
managerWasmResolverKey string
|
|
)
|
|
|
|
const (
|
|
managerCompilerContextKey = managerContextKey("compiler")
|
|
managerWasmResolverContextKey = managerWasmResolverKey("wasmResolvers")
|
|
)
|
|
|
|
// SetCompilerOnContext puts the compiler into the storage context. Calling this
|
|
// function before committing updated policies to storage allows the manager to
|
|
// skip parsing and compiling of modules. Instead, the manager will use the
|
|
// compiler that was stored on the context.
|
|
func SetCompilerOnContext(context *storage.Context, compiler *ast.Compiler) {
|
|
context.Put(managerCompilerContextKey, compiler)
|
|
}
|
|
|
|
// GetCompilerOnContext gets the compiler cached on the storage context.
|
|
func GetCompilerOnContext(context *storage.Context) *ast.Compiler {
|
|
compiler, ok := context.Get(managerCompilerContextKey).(*ast.Compiler)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return compiler
|
|
}
|
|
|
|
// SetWasmResolversOnContext puts a set of Wasm Resolvers into the storage
|
|
// context. Calling this function before committing updated wasm modules to
|
|
// storage allows the manager to skip initializing modules before using them.
|
|
// Instead, the manager will use the compiler that was stored on the context.
|
|
func SetWasmResolversOnContext(context *storage.Context, rs []*wasm.Resolver) {
|
|
context.Put(managerWasmResolverContextKey, rs)
|
|
}
|
|
|
|
// getWasmResolversOnContext gets the resolvers cached on the storage context.
|
|
func getWasmResolversOnContext(context *storage.Context) []*wasm.Resolver {
|
|
resolvers, ok := context.Get(managerWasmResolverContextKey).([]*wasm.Resolver)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return resolvers
|
|
}
|
|
|
|
func validateTriggerMode(mode TriggerMode) error {
|
|
switch mode {
|
|
case TriggerPeriodic, TriggerManual:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("invalid trigger mode %q (want %q or %q)", mode, TriggerPeriodic, TriggerManual)
|
|
}
|
|
}
|
|
|
|
// ValidateAndInjectDefaultsForTriggerMode validates the trigger mode and injects default values
|
|
func ValidateAndInjectDefaultsForTriggerMode(a, b *TriggerMode) (*TriggerMode, error) {
|
|
if a == nil && b != nil {
|
|
err := validateTriggerMode(*b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return b, nil
|
|
} else if a != nil && b == nil {
|
|
err := validateTriggerMode(*a)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return a, nil
|
|
} else if a != nil && b != nil {
|
|
if *a != *b {
|
|
return nil, fmt.Errorf("trigger mode mismatch: %s and %s (hint: check discovery configuration)", *a, *b)
|
|
}
|
|
err := validateTriggerMode(*a)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return a, nil
|
|
}
|
|
|
|
t := DefaultTriggerMode
|
|
return &t, nil
|
|
}
|
|
|
|
type namedplugin struct {
|
|
name string
|
|
plugin Plugin
|
|
}
|
|
|
|
// Info sets the runtime information on the manager. The runtime information is
|
|
// propagated to opa.runtime() built-in function calls.
|
|
func Info(term *ast.Term) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.Info = term
|
|
}
|
|
}
|
|
|
|
// InitBundles provides the initial set of bundles to load.
|
|
func InitBundles(b map[string]*bundle.Bundle) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.initBundles = b
|
|
}
|
|
}
|
|
|
|
// InitFiles provides the initial set of other data/policy files to load.
|
|
func InitFiles(f loader.Result) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.initFiles = f
|
|
}
|
|
}
|
|
|
|
// MaxErrors sets the error limit for the manager's shared compiler.
|
|
func MaxErrors(n int) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.maxErrors = n
|
|
}
|
|
}
|
|
|
|
// GracefulShutdownPeriod passes the configured graceful shutdown period to plugins
|
|
func GracefulShutdownPeriod(gracefulShutdownPeriod int) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.gracefulShutdownPeriod = gracefulShutdownPeriod
|
|
}
|
|
}
|
|
|
|
// Logger configures the passed logger on the plugin manager (useful to
|
|
// configure default fields)
|
|
func Logger(logger logging.Logger) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.logger = logger
|
|
}
|
|
}
|
|
|
|
// ConsoleLogger sets the passed logger to be used by plugins that are
|
|
// configured with console logging enabled.
|
|
func ConsoleLogger(logger logging.Logger) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.consoleLogger = logger
|
|
}
|
|
}
|
|
|
|
func EnablePrintStatements(yes bool) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.enablePrintStatements = yes
|
|
}
|
|
}
|
|
|
|
func PrintHook(h print.Hook) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.printHook = h
|
|
}
|
|
}
|
|
|
|
func WithRouter(r *http.ServeMux) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.router = r
|
|
}
|
|
}
|
|
|
|
// WithPrometheusRegister sets the passed prometheus.Registerer to be used by plugins
|
|
func WithPrometheusRegister(prometheusRegister prometheus.Registerer) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.prometheusRegister = prometheusRegister
|
|
}
|
|
}
|
|
|
|
// WithTracerProvider sets the passed *trace.TracerProvider to be used by plugins
|
|
func WithTracerProvider(tracerProvider *trace.TracerProvider) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.tracerProvider = tracerProvider
|
|
}
|
|
}
|
|
|
|
// WithDistributedTracingOpts sets the options to be used by distributed tracing.
|
|
func WithDistributedTracingOpts(tr tracing.Options) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.distributedTacingOpts = tr
|
|
}
|
|
}
|
|
|
|
// WithHooks allows passing hooks to the plugin manager.
|
|
func WithHooks(hs hooks.Hooks) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.hooks = hs
|
|
}
|
|
}
|
|
|
|
// WithParserOptions sets the parser options to be used by the plugin manager.
|
|
func WithParserOptions(opts ast.ParserOptions) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.parserOptions = opts
|
|
}
|
|
}
|
|
|
|
// WithEnableTelemetry controls whether OPA will send telemetry reports to an external service.
|
|
func WithEnableTelemetry(enableTelemetry bool) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.enableTelemetry = enableTelemetry
|
|
}
|
|
}
|
|
|
|
// WithTelemetryGatherers allows registration of telemetry gatherers which enable injection of additional data in the
|
|
// telemetry report
|
|
func WithTelemetryGatherers(gs map[string]report.Gatherer) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.registeredTelemetryGatherers = gs
|
|
}
|
|
}
|
|
|
|
// WithBundleActivatorPlugin sets the name of the activator plugin to load bundles into the store
|
|
func WithBundleActivatorPlugin(bundleActivatorPlugin string) func(*Manager) {
|
|
return func(m *Manager) {
|
|
m.bundleActivatorPlugin = bundleActivatorPlugin
|
|
}
|
|
}
|
|
|
|
// New creates a new Manager using config.
|
|
func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*Manager, error) {
|
|
parsedConfig, err := config.ParseConfig(raw, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m := &Manager{
|
|
Store: store,
|
|
Config: parsedConfig,
|
|
ID: id,
|
|
pluginStatus: map[string]*Status{},
|
|
pluginStatusListeners: map[string]StatusListener{},
|
|
maxErrors: -1,
|
|
serverInitialized: make(chan struct{}),
|
|
bootstrapConfigLabels: parsedConfig.Labels,
|
|
extraRoutes: map[string]ExtraRoute{},
|
|
}
|
|
|
|
for _, f := range opts {
|
|
f(m)
|
|
}
|
|
|
|
if m.parserOptions.RegoVersion == ast.RegoUndefined {
|
|
// Default to v1 if rego-version is not set through options
|
|
m.parserOptions.RegoVersion = ast.DefaultRegoVersion
|
|
}
|
|
|
|
if m.logger == nil {
|
|
m.logger = logging.Get()
|
|
}
|
|
|
|
if m.consoleLogger == nil {
|
|
m.consoleLogger = logging.New()
|
|
}
|
|
|
|
m.hooks.Each(func(h hooks.Hook) {
|
|
if f, ok := h.(hooks.ConfigHook); ok {
|
|
if c, e := f.OnConfig(context.Background(), parsedConfig); e != nil {
|
|
err = errors.Join(err, e)
|
|
} else {
|
|
parsedConfig = c
|
|
}
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// do after options and overrides
|
|
m.keys, err = keys.ParseKeysConfig(parsedConfig.Keys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.interQueryBuiltinCacheConfig, err = cache.ParseCachingConfig(parsedConfig.Caching)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
serviceOpts := m.DefaultServiceOpts(parsedConfig)
|
|
|
|
m.services, err = cfg.ParseServicesConfig(serviceOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if m.enableTelemetry {
|
|
reporter, err := report.New(report.Options{Logger: m.logger})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.reporter = reporter
|
|
|
|
m.reporter.RegisterGatherer("min_compatible_version", func(_ context.Context) (any, error) {
|
|
var minimumCompatibleVersion string
|
|
if c := m.GetCompiler(); c != nil && c.Required != nil {
|
|
minimumCompatibleVersion, _ = c.Required.MinimumCompatibleVersion()
|
|
}
|
|
return minimumCompatibleVersion, nil
|
|
})
|
|
|
|
// register any additional gatherers
|
|
for k, g := range m.registeredTelemetryGatherers {
|
|
m.reporter.RegisterGatherer(k, g)
|
|
}
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// Init returns an error if the manager could not initialize itself. Init() should
|
|
// be called before Start(). Init() is idempotent.
|
|
func (m *Manager) Init(ctx context.Context) error {
|
|
if m.initialized {
|
|
return nil
|
|
}
|
|
|
|
params := storage.TransactionParams{
|
|
Write: true,
|
|
Context: storage.NewContext(),
|
|
}
|
|
|
|
if m.enableTelemetry {
|
|
m.opaReportNotifyCh = make(chan struct{})
|
|
m.stop = make(chan chan struct{})
|
|
go m.sendOPAUpdateLoop(ctx)
|
|
}
|
|
|
|
err := storage.Txn(ctx, m.Store, params, func(txn storage.Transaction) error {
|
|
result, err := initload.InsertAndCompile(ctx, initload.InsertAndCompileOptions{
|
|
Store: m.Store,
|
|
Txn: txn,
|
|
Files: m.initFiles,
|
|
Bundles: m.initBundles,
|
|
MaxErrors: m.maxErrors,
|
|
EnablePrintStatements: m.enablePrintStatements,
|
|
ParserOptions: m.parserOptions,
|
|
BundleActivatorPlugin: m.bundleActivatorPlugin,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
SetCompilerOnContext(params.Context, result.Compiler)
|
|
|
|
resolvers, err := bundleUtils.LoadWasmResolversFromStore(ctx, m.Store, txn, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
SetWasmResolversOnContext(params.Context, resolvers)
|
|
|
|
_, err = m.Store.Register(ctx, txn, storage.TriggerConfig{OnCommit: m.onCommit})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
if m.stop != nil {
|
|
done := make(chan struct{})
|
|
m.stop <- done
|
|
<-done
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
m.initialized = true
|
|
return nil
|
|
}
|
|
|
|
// Labels returns the set of labels from the configuration.
|
|
func (m *Manager) Labels() map[string]string {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
return maps.Clone(m.Config.Labels)
|
|
}
|
|
|
|
// InterQueryBuiltinCacheConfig returns the configuration for the inter-query caches.
|
|
func (m *Manager) InterQueryBuiltinCacheConfig() *cache.Config {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
return m.interQueryBuiltinCacheConfig.Clone()
|
|
}
|
|
|
|
// GetConfig returns a deep copy of the manager's configuration.
|
|
func (m *Manager) GetConfig() *config.Config {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
return m.Config.Clone()
|
|
}
|
|
|
|
// Register adds a plugin to the manager. When the manager is started, all of
|
|
// the plugins will be started.
|
|
func (m *Manager) Register(name string, plugin Plugin) {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
m.plugins = append(m.plugins, namedplugin{
|
|
name: name,
|
|
plugin: plugin,
|
|
})
|
|
if _, ok := m.pluginStatus[name]; !ok {
|
|
m.pluginStatus[name] = &Status{State: StateNotReady}
|
|
}
|
|
}
|
|
|
|
// Plugins returns the list of plugins registered with the manager.
|
|
func (m *Manager) Plugins() []string {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
result := make([]string, len(m.plugins))
|
|
for i := range m.plugins {
|
|
result[i] = m.plugins[i].name
|
|
}
|
|
return result
|
|
}
|
|
|
|
// Plugin returns the plugin registered with name or nil if name is not found.
|
|
func (m *Manager) Plugin(name string) Plugin {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
for i := range m.plugins {
|
|
if m.plugins[i].name == name {
|
|
return m.plugins[i].plugin
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AuthPlugin returns the HTTPAuthPlugin registered with name or nil if name is not found.
|
|
func (m *Manager) AuthPlugin(name string) rest.HTTPAuthPlugin {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
for i := range m.plugins {
|
|
if m.plugins[i].name == name {
|
|
return m.plugins[i].plugin.(rest.HTTPAuthPlugin)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetCompiler returns the manager's compiler.
|
|
func (m *Manager) GetCompiler() *ast.Compiler {
|
|
m.compilerMux.RLock()
|
|
defer m.compilerMux.RUnlock()
|
|
return m.compiler
|
|
}
|
|
|
|
func (m *Manager) setCompiler(compiler *ast.Compiler) {
|
|
m.compilerMux.Lock()
|
|
defer m.compilerMux.Unlock()
|
|
m.compiler = compiler
|
|
}
|
|
|
|
type ExtraRoute struct {
|
|
PromName string // name is for prometheus metrics
|
|
HandlerFunc http.HandlerFunc
|
|
}
|
|
|
|
func (m *Manager) ExtraRoutes() map[string]ExtraRoute {
|
|
return m.extraRoutes
|
|
}
|
|
|
|
func (m *Manager) ExtraMiddlewares() []func(http.Handler) http.Handler {
|
|
return m.extraMiddlewares
|
|
}
|
|
|
|
func (m *Manager) ExtraAuthorizerRoutes() []func(string, []any) bool {
|
|
return m.extraAuthorizerRoutes
|
|
}
|
|
|
|
// ExtraRoute registers an extra route to be served by the HTTP
|
|
// server later. Using this instead of directly registering routes
|
|
// with GetRouter() lets the server apply its handler wrapping for
|
|
// Prometheus and OpenTelemetry.
|
|
// Caution: This cannot be used to dynamically register and un-
|
|
// register HTTP handlers. It's meant as a late-stage set up helper,
|
|
// to be called from a plugin's init methods.
|
|
func (m *Manager) ExtraRoute(path, name string, hf http.HandlerFunc) {
|
|
if _, ok := m.extraRoutes[path]; ok {
|
|
panic("extra route already registered: " + path)
|
|
}
|
|
m.extraRoutes[path] = ExtraRoute{
|
|
PromName: name,
|
|
HandlerFunc: hf,
|
|
}
|
|
}
|
|
|
|
// ExtraMiddleware registers extra middlewares (`func(http.Handler) http.Handler`)
|
|
// to be injected into the HTTP handler chain in the server later.
|
|
// Caution: This cannot be used to dynamically register and un-
|
|
// register middlewares. It's meant as a late-stage set up helper,
|
|
// to be called from a plugin's init methods.
|
|
func (m *Manager) ExtraMiddleware(mw ...func(http.Handler) http.Handler) {
|
|
m.extraMiddlewares = append(m.extraMiddlewares, mw...)
|
|
}
|
|
|
|
// ExtraAuthorizerRoute registers an extra URL path validator function for use
|
|
// in the server authorizer. These functions designate specific methods and URL
|
|
// prefixes or paths where the authorizer should allow request body parsing.
|
|
// Caution: This cannot be used to dynamically register and un-
|
|
// register path validator functions. It's meant as a late-stage
|
|
// set up helper, to be called from a plugin's init methods.
|
|
func (m *Manager) ExtraAuthorizerRoute(validatorFunc func(string, []any) bool) {
|
|
m.extraAuthorizerRoutes = append(m.extraAuthorizerRoutes, validatorFunc)
|
|
}
|
|
|
|
// GetRouter returns the managers router if set
|
|
func (m *Manager) GetRouter() *http.ServeMux {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
return m.router
|
|
}
|
|
|
|
// RegisterCompilerTrigger registers for change notifications when the compiler
|
|
// is changed.
|
|
func (m *Manager) RegisterCompilerTrigger(f func(storage.Transaction)) {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
m.registeredTriggers = append(m.registeredTriggers, f)
|
|
}
|
|
|
|
// GetWasmResolvers returns the manager's set of Wasm Resolvers.
|
|
func (m *Manager) GetWasmResolvers() []*wasm.Resolver {
|
|
m.wasmResolversMtx.RLock()
|
|
defer m.wasmResolversMtx.RUnlock()
|
|
return m.wasmResolvers
|
|
}
|
|
|
|
func (m *Manager) setWasmResolvers(rs []*wasm.Resolver) {
|
|
m.wasmResolversMtx.Lock()
|
|
defer m.wasmResolversMtx.Unlock()
|
|
m.wasmResolvers = rs
|
|
}
|
|
|
|
// Start starts the manager. Init() should be called once before Start().
|
|
func (m *Manager) Start(ctx context.Context) error {
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
|
|
if !m.initialized {
|
|
if err := m.Init(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
var toStart []Plugin
|
|
|
|
func() {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
toStart = make([]Plugin, len(m.plugins))
|
|
for i := range m.plugins {
|
|
toStart[i] = m.plugins[i].plugin
|
|
}
|
|
}()
|
|
|
|
for i := range toStart {
|
|
if err := toStart[i].Start(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the manager, stopping all the plugins registered with it.
|
|
// Any plugin that needs to perform cleanup should do so within the duration
|
|
// of the graceful shutdown period passed with the context as a timeout.
|
|
// Note that a graceful shutdown period configured with the Manager instance
|
|
// will override the timeout of the passed in context (if applicable).
|
|
func (m *Manager) Stop(ctx context.Context) {
|
|
var toStop []Plugin
|
|
|
|
func() {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
toStop = make([]Plugin, len(m.plugins))
|
|
for i := range m.plugins {
|
|
toStop[i] = m.plugins[i].plugin
|
|
}
|
|
}()
|
|
|
|
var cancel context.CancelFunc
|
|
if m.gracefulShutdownPeriod > 0 {
|
|
ctx, cancel = context.WithTimeout(ctx, time.Duration(m.gracefulShutdownPeriod)*time.Second)
|
|
} else {
|
|
ctx, cancel = context.WithCancel(ctx)
|
|
}
|
|
defer cancel()
|
|
for i := range toStop {
|
|
toStop[i].Stop(ctx)
|
|
}
|
|
if c, ok := m.Store.(interface{ Close(context.Context) error }); ok {
|
|
if err := c.Close(ctx); err != nil {
|
|
m.logger.Error("Error closing store: %v", err)
|
|
}
|
|
}
|
|
|
|
if m.stop != nil {
|
|
done := make(chan struct{})
|
|
m.stop <- done
|
|
<-done
|
|
}
|
|
}
|
|
|
|
func (m *Manager) DefaultServiceOpts(config *config.Config) cfg.ServiceOptions {
|
|
return cfg.ServiceOptions{
|
|
Raw: config.Services,
|
|
AuthPlugin: m.AuthPlugin,
|
|
Logger: m.logger,
|
|
Keys: m.keys,
|
|
DistributedTacingOpts: m.distributedTacingOpts,
|
|
}
|
|
}
|
|
|
|
// Reconfigure updates the configuration on the manager.
|
|
func (m *Manager) Reconfigure(newCfg *config.Config) error {
|
|
config := newCfg.Clone()
|
|
|
|
opts := m.DefaultServiceOpts(config)
|
|
|
|
keys, err := keys.ParseKeysConfig(config.Keys)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
opts.Keys = keys
|
|
|
|
services, err := cfg.ParseServicesConfig(opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
interQueryBuiltinCacheConfig, err := cache.ParseCachingConfig(config.Caching)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
// don't overwrite existing labels, only allow additions - always based on the boostrap config
|
|
if config.Labels == nil {
|
|
config.Labels = m.bootstrapConfigLabels
|
|
} else {
|
|
maps.Copy(config.Labels, m.bootstrapConfigLabels)
|
|
}
|
|
|
|
// don't erase persistence directory
|
|
if config.PersistenceDirectory == nil {
|
|
// update is ok since we have the lock
|
|
config.PersistenceDirectory = m.Config.PersistenceDirectory
|
|
}
|
|
|
|
m.Config = config
|
|
m.interQueryBuiltinCacheConfig = interQueryBuiltinCacheConfig
|
|
|
|
maps.Copy(m.services, services)
|
|
maps.Copy(m.keys, keys)
|
|
|
|
for _, trigger := range m.registeredCacheTriggers {
|
|
trigger(interQueryBuiltinCacheConfig)
|
|
}
|
|
|
|
for _, trigger := range m.registeredNDCacheTriggers {
|
|
trigger(config.NDBuiltinCache)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PluginStatus returns the current statuses of any plugins registered.
|
|
func (m *Manager) PluginStatus() map[string]*Status {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
return m.copyPluginStatus()
|
|
}
|
|
|
|
// RegisterPluginStatusListener registers a StatusListener to be
|
|
// called when plugin status updates occur.
|
|
func (m *Manager) RegisterPluginStatusListener(name string, listener StatusListener) {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
m.pluginStatusListeners[name] = listener
|
|
}
|
|
|
|
// UnregisterPluginStatusListener removes a StatusListener registered with the
|
|
// same name.
|
|
func (m *Manager) UnregisterPluginStatusListener(name string) {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
delete(m.pluginStatusListeners, name)
|
|
}
|
|
|
|
// UpdatePluginStatus updates a named plugins status. Any registered
|
|
// listeners will be called with a copy of the new state of all
|
|
// plugins.
|
|
func (m *Manager) UpdatePluginStatus(pluginName string, status *Status) {
|
|
var toNotify map[string]StatusListener
|
|
var statuses map[string]*Status
|
|
|
|
func() {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
m.pluginStatus[pluginName] = status
|
|
toNotify = make(map[string]StatusListener, len(m.pluginStatusListeners))
|
|
maps.Copy(toNotify, m.pluginStatusListeners)
|
|
statuses = m.copyPluginStatus()
|
|
}()
|
|
|
|
for _, l := range toNotify {
|
|
l(statuses)
|
|
}
|
|
}
|
|
|
|
func (m *Manager) copyPluginStatus() map[string]*Status {
|
|
statusCpy := map[string]*Status{}
|
|
for k, v := range m.pluginStatus {
|
|
var cpy *Status
|
|
if v != nil {
|
|
cpy = &Status{
|
|
State: v.State,
|
|
Message: v.Message,
|
|
}
|
|
}
|
|
statusCpy[k] = cpy
|
|
}
|
|
return statusCpy
|
|
}
|
|
|
|
func (m *Manager) onCommit(ctx context.Context, txn storage.Transaction, event storage.TriggerEvent) {
|
|
compiler := GetCompilerOnContext(event.Context)
|
|
|
|
// If the context does not contain the compiler fallback to loading the
|
|
// compiler from the store. Currently the bundle plugin sets the
|
|
// compiler on the context but the server does not (nor would users
|
|
// implementing their own policy loading.)
|
|
if compiler == nil && event.PolicyChanged() {
|
|
compiler, _ = loadCompilerFromStore(ctx, m.Store, txn, m.enablePrintStatements, m.ParserOptions())
|
|
}
|
|
|
|
if compiler != nil {
|
|
m.setCompiler(compiler)
|
|
|
|
if m.enableTelemetry && event.PolicyChanged() {
|
|
m.opaReportNotifyCh <- struct{}{}
|
|
}
|
|
|
|
for _, f := range m.registeredTriggers {
|
|
f(txn)
|
|
}
|
|
}
|
|
|
|
// Similar to the compiler, look for a set of resolvers on the transaction
|
|
// context. If they are not set we may need to reload from the store.
|
|
resolvers := getWasmResolversOnContext(event.Context)
|
|
if resolvers != nil {
|
|
m.setWasmResolvers(resolvers)
|
|
} else if event.DataChanged() {
|
|
if requiresWasmResolverReload(event) {
|
|
resolvers, err := bundleUtils.LoadWasmResolversFromStore(ctx, m.Store, txn, nil)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
m.setWasmResolvers(resolvers)
|
|
} else {
|
|
err := m.updateWasmResolversData(ctx, event)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func loadCompilerFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, enablePrintStatements bool, popts ast.ParserOptions) (*ast.Compiler, error) {
|
|
policies, err := store.ListPolicies(ctx, txn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
modules := map[string]*ast.Module{}
|
|
|
|
for _, policy := range policies {
|
|
bs, err := store.GetPolicy(ctx, txn, policy)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
module, err := ast.ParseModuleWithOpts(policy, string(bs), popts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
modules[policy] = module
|
|
}
|
|
|
|
compiler := ast.NewCompiler().
|
|
WithEnablePrintStatements(enablePrintStatements)
|
|
|
|
if popts.RegoVersion != ast.RegoUndefined {
|
|
compiler = compiler.WithDefaultRegoVersion(popts.RegoVersion)
|
|
}
|
|
|
|
compiler.Compile(modules)
|
|
return compiler, nil
|
|
}
|
|
|
|
func requiresWasmResolverReload(event storage.TriggerEvent) bool {
|
|
// If the data changes touched the bundle path (which includes
|
|
// the wasm modules) we will reload them. Otherwise update
|
|
// data for each module already on the manager.
|
|
for _, dataEvent := range event.Data {
|
|
if dataEvent.Path.HasPrefix(bundle.BundlesBasePath) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *Manager) updateWasmResolversData(ctx context.Context, event storage.TriggerEvent) error {
|
|
m.wasmResolversMtx.Lock()
|
|
defer m.wasmResolversMtx.Unlock()
|
|
|
|
for _, resolver := range m.wasmResolvers {
|
|
for _, dataEvent := range event.Data {
|
|
var err error
|
|
if dataEvent.Removed {
|
|
err = resolver.RemoveDataPath(ctx, dataEvent.Path)
|
|
} else {
|
|
err = resolver.SetDataPath(ctx, dataEvent.Path, dataEvent.Data)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update wasm runtime data: %s", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PublicKeys returns a public keys that can be used for verifying signed bundles.
|
|
func (m *Manager) PublicKeys() map[string]*keys.Config {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
if m.keys == nil {
|
|
return make(map[string]*keys.Config)
|
|
}
|
|
|
|
result := make(map[string]*keys.Config, len(m.keys))
|
|
for k, v := range m.keys {
|
|
if v != nil {
|
|
copied := *v
|
|
result[k] = &copied
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// Client returns a client for communicating with a remote service.
|
|
func (m *Manager) Client(name string) rest.Client {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
return m.services[name]
|
|
}
|
|
|
|
// Services returns a list of services that m can provide clients for.
|
|
func (m *Manager) Services() []string {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
s := make([]string, 0, len(m.services))
|
|
for name := range m.services {
|
|
s = append(s, name)
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Logger gets the standard logger for this plugin manager.
|
|
func (m *Manager) Logger() logging.Logger {
|
|
return m.logger
|
|
}
|
|
|
|
// ConsoleLogger gets the console logger for this plugin manager.
|
|
func (m *Manager) ConsoleLogger() logging.Logger {
|
|
return m.consoleLogger
|
|
}
|
|
|
|
func (m *Manager) PrintHook() print.Hook {
|
|
return m.printHook
|
|
}
|
|
|
|
func (m *Manager) EnablePrintStatements() bool {
|
|
return m.enablePrintStatements
|
|
}
|
|
|
|
// ServerInitialized signals a channel indicating that the OPA
|
|
// server has finished initialization.
|
|
func (m *Manager) ServerInitialized() {
|
|
m.serverInitializedOnce.Do(func() { close(m.serverInitialized) })
|
|
}
|
|
|
|
// ServerInitializedChannel returns a receive-only channel that
|
|
// is closed when the OPA server has finished initialization.
|
|
// Be aware that the socket of the server listener may not be
|
|
// open by the time this channel is closed. There is a very
|
|
// small window where the socket may still be closed, due to
|
|
// a race condition.
|
|
func (m *Manager) ServerInitializedChannel() <-chan struct{} {
|
|
return m.serverInitialized
|
|
}
|
|
|
|
// RegisterCacheTrigger accepts a func that receives new inter-query cache config generated by
|
|
// a reconfigure of the plugin manager, so that it can be propagated to existing inter-query caches.
|
|
func (m *Manager) RegisterCacheTrigger(trigger func(*cache.Config)) {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
m.registeredCacheTriggers = append(m.registeredCacheTriggers, trigger)
|
|
}
|
|
|
|
// PrometheusRegister gets the prometheus.Registerer for this plugin manager.
|
|
func (m *Manager) PrometheusRegister() prometheus.Registerer {
|
|
return m.prometheusRegister
|
|
}
|
|
|
|
// TracerProvider gets the *trace.TracerProvider for this plugin manager.
|
|
func (m *Manager) TracerProvider() *trace.TracerProvider {
|
|
return m.tracerProvider
|
|
}
|
|
|
|
func (m *Manager) RegisterNDCacheTrigger(trigger func(bool)) {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
m.registeredNDCacheTriggers = append(m.registeredNDCacheTriggers, trigger)
|
|
}
|
|
|
|
func (m *Manager) sendOPAUpdateLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(time.Duration(int64(time.Second) * defaultUploadIntervalSec))
|
|
mr.New(mr.NewSource(time.Now().UnixNano()))
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
var opaReportNotify bool
|
|
|
|
for {
|
|
select {
|
|
case <-m.opaReportNotifyCh:
|
|
opaReportNotify = true
|
|
case <-ticker.C:
|
|
ticker.Stop()
|
|
|
|
if opaReportNotify {
|
|
opaReportNotify = false
|
|
_, err := m.reporter.SendReport(ctx)
|
|
if err != nil {
|
|
m.logger.WithFields(map[string]any{"err": err}).Debug("Unable to send OPA telemetry report.")
|
|
}
|
|
}
|
|
|
|
newInterval := mr.Int63n(defaultUploadIntervalSec) + defaultUploadIntervalSec
|
|
ticker = time.NewTicker(time.Duration(int64(time.Second) * newInterval))
|
|
case done := <-m.stop:
|
|
cancel()
|
|
ticker.Stop()
|
|
done <- struct{}{}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) ParserOptions() ast.ParserOptions {
|
|
return m.parserOptions
|
|
}
|