unify eventstream creation

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-08-07 14:05:57 +02:00
parent c722f9c77b
commit 49cdcad129
20 changed files with 42 additions and 259 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: Nats named connections
Names the nats connections for easier debugging
https://github.com/owncloud/ocis/pull/6979

View File

@@ -87,7 +87,7 @@ func (av Antivirus) Run() error {
evtsCfg.TLSInsecure = false
}
stream, err := stream.NatsFromConfig(stream.NatsConfig(av.c.Events))
stream, err := stream.NatsFromConfig(av.c.Service.Name, stream.NatsConfig(av.c.Events))
if err != nil {
return err
}

View File

@@ -2,17 +2,12 @@ package command
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
@@ -47,39 +42,11 @@ func Server(cfg *config.Config) *cli.Command {
)
defer cancel()
evtsCfg := cfg.Events
var tlsConf *tls.Config
if evtsCfg.EnableTLS {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return err
}
evtsCfg.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
client, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
evts, err := events.Consume(client, evtsCfg.ConsumerGroup, types.RegisteredEvents()...)
evts, err := events.Consume(client, "audit", types.RegisteredEvents()...)
if err != nil {
return err
}

View File

@@ -26,7 +26,6 @@ type Config struct {
type Events struct {
Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;AUDIT_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;AUDIT_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
ConsumerGroup string `yaml:"group" env:"AUDIT_EVENTS_GROUP" desc:"The consumergroup of the service. One group will only get one copy of an event."`
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;AUDIT_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;AUDIT_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided AUDIT_EVENTS_TLS_INSECURE will be seen as false."`
EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;AUDIT_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."`

View File

@@ -24,10 +24,9 @@ func DefaultConfig() *config.Config {
Name: "audit",
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
ConsumerGroup: "audit",
EnableTLS: false,
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
EnableTLS: false,
},
Auditlog: config.Auditlog{
LogToConsole: true,

View File

@@ -60,7 +60,7 @@ func Server(cfg *config.Config) *cli.Command {
metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1)
consumer, err := stream.NatsFromConfig(stream.NatsConfig(cfg.Events))
consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -1,20 +1,15 @@
package http
import (
"crypto/tls"
"crypto/x509"
"fmt"
stdhttp "net/http"
"os"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/owncloud/ocis/v2/ocis-pkg/account"
"github.com/owncloud/ocis/v2/ocis-pkg/cors"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/keycloak"
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
@@ -57,34 +52,7 @@ func Server(opts ...Option) (http.Service, error) {
if options.Config.Events.Endpoint != "" {
var err error
var tlsConf *tls.Config
if options.Config.Events.EnableTLS {
var rootCAPool *x509.CertPool
if options.Config.Events.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(options.Config.Events.TLSRootCACertificate)
if err != nil {
return http.Service{}, err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return http.Service{}, err
}
options.Config.Events.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: options.Config.Events.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
publisher, err = stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(options.Config.Events.Endpoint),
natsjs.ClusterID(options.Config.Events.Cluster),
)
publisher, err = stream.NatsFromConfig(options.Config.Service.Name, stream.NatsConfig(options.Config.Events))
if err != nil {
options.Logger.Error().
Err(err).

View File

@@ -2,18 +2,13 @@ package command
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
@@ -94,40 +89,11 @@ func Server(cfg *config.Config) *cli.Command {
events.SpaceUnshared{},
events.SpaceMembershipExpired{},
}
evtsCfg := cfg.Notifications.Events
var tlsConf *tls.Config
if evtsCfg.EnableTLS {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return err
}
rootCAPool, err = crypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return err
}
evtsCfg.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
client, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Notifications.Events))
if err != nil {
return err
}
evts, err := events.Consume(client, evtsCfg.ConsumerGroup, evs...)
evts, err := events.Consume(client, "notifications", evs...)
if err != nil {
return err
}

View File

@@ -51,7 +51,6 @@ type SMTP struct {
type Events struct {
Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;NOTIFICATIONS_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;NOTIFICATIONS_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
ConsumerGroup string `yaml:"group" env:"NOTIFICATIONS_EVENTS_GROUP" desc:"Name of the event group / queue on the event system."`
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;NOTIFICATIONS_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;NOTIFICATIONS_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided NOTIFICATIONS_EVENTS_TLS_INSECURE will be seen as false."`
EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;NOTIFICATIONS_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."`

View File

@@ -38,10 +38,9 @@ func DefaultConfig() *config.Config {
Encryption: "none",
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
ConsumerGroup: "notifications",
EnableTLS: false,
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
EnableTLS: false,
},
RevaGateway: shared.DefaultRevaConfig().Address,
},

View File

@@ -2,18 +2,13 @@ package command
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"os"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
@@ -102,33 +97,8 @@ func Server(cfg *config.Config) *cli.Command {
}
{
var tlsConf *tls.Config
if cfg.Events.EnableTLS {
var rootCAPool *x509.CertPool
if cfg.Events.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(cfg.Events.TLSRootCACertificate)
if err != nil {
return err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return err
}
cfg.Events.TLSInsecure = false
}
tlsConf = &tls.Config{
RootCAs: rootCAPool,
}
}
bus, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Events.Endpoint),
natsjs.ClusterID(cfg.Events.Cluster),
)
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
@@ -29,7 +30,7 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
stream, err := getEventBus(cfg.Postprocessing.Events)
stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events))
if err != nil {
return err
}

View File

@@ -2,17 +2,12 @@ package command
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/store"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/oklog/run"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
@@ -53,7 +48,7 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()
{
bus, err := getEventBus(cfg.Postprocessing.Events)
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events))
if err != nil {
return err
}
@@ -110,32 +105,3 @@ func Server(cfg *config.Config) *cli.Command {
},
}
}
func getEventBus(evtsCfg config.Events) (events.Stream, error) {
var tlsConf *tls.Config
if evtsCfg.EnableTLS {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return nil, err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, err
}
evtsCfg.TLSInsecure = false
}
tlsConf = &tls.Config{
RootCAs: rootCAPool,
}
}
return stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
}

View File

@@ -2,11 +2,8 @@ package service
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"
"time"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
@@ -17,9 +14,7 @@ import (
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/token"
"github.com/cs3org/reva/v2/pkg/token/manager/jwt"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/jellydator/ttlcache/v2"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0"
@@ -78,33 +73,13 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
}
var tlsConf *tls.Config
if cfg.Events.EnableTLS {
var rootCAPool *x509.CertPool
if cfg.Events.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(cfg.Events.TLSRootCACertificate)
if err != nil {
return nil, teardown, err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, teardown, err
}
cfg.Events.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: cfg.Events.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
bus, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Events.Endpoint),
natsjs.ClusterID(cfg.Events.Cluster),
)
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCACertificate,
})
if err != nil {
return nil, teardown, err
}

View File

@@ -123,6 +123,7 @@ func SharingConfigFromStruct(cfg *config.Config) map[string]interface{} {
"tls-insecure": cfg.Events.TLSInsecure,
"tls-root-ca-cert": cfg.Events.TLSRootCaCertPath,
"enable-tls": cfg.Events.EnableTLS,
"name": "sharing-eventsmiddleware",
},
"prometheus": map[string]interface{}{
"namespace": "ocis",

View File

@@ -89,7 +89,7 @@ func Server(cfg *config.Config) *cli.Command {
}
{
stream, err := event.NewStream(cfg.Events)
stream, err := event.NewStream(cfg)
if err != nil {
logger.Fatal().Err(err).Msg("can't connect to nats")
}

View File

@@ -32,7 +32,7 @@ func PurgeExpiredResources(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
stream, err := event.NewStream(cfg.Events)
stream, err := event.NewStream(cfg)
if err != nil {
return err
}

View File

@@ -1,51 +1,18 @@
package event
import (
"crypto/tls"
"crypto/x509"
"os"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/go-micro/plugins/v4/events/natsjs"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config"
"go-micro.dev/v4/events"
)
// NewStream prepares the requested nats stream and returns it.
func NewStream(cfg config.Events) (events.Stream, error) {
var tlsConf *tls.Config
if cfg.EnableTLS {
var rootCAPool *x509.CertPool
if cfg.TLSRootCaCertPath != "" {
rootCrtFile, err := os.Open(cfg.TLSRootCaCertPath)
if err != nil {
return nil, err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, err
}
cfg.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: rootCAPool,
}
}
s, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Addr),
natsjs.ClusterID(cfg.ClusterID),
)
if err != nil {
return nil, err
}
return s, nil
func NewStream(cfg *config.Config) (events.Stream, error) {
return stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{
Endpoint: cfg.Events.Addr,
Cluster: cfg.Events.ClusterID,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCaCertPath,
})
}

View File

@@ -47,6 +47,7 @@ func StorageUsersConfigFromStruct(cfg *config.Config) map[string]interface{} {
"tls-insecure": cfg.Events.TLSInsecure,
"tls-root-ca-cert": cfg.Events.TLSRootCaCertPath,
"enable-tls": cfg.Events.EnableTLS,
"name": "storage-users-eventsmiddleware",
},
"prometheus": map[string]interface{}{
"namespace": "ocis",

View File

@@ -81,7 +81,7 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()
consumer, err := stream.NatsFromConfig(stream.NatsConfig(cfg.Events))
consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}