diff --git a/changelog/unreleased/fix-graceful-shutdown.md b/changelog/unreleased/fix-graceful-shutdown.md new file mode 100644 index 0000000000..3e596433c3 --- /dev/null +++ b/changelog/unreleased/fix-graceful-shutdown.md @@ -0,0 +1,6 @@ +Bugfix: Fix the graceful shutdown + +Fix the graceful shutdown using the new ocis and reva runners. + +https://github.com/owncloud/ocis/pull/11295 +https://github.com/owncloud/ocis/issues/11170 \ No newline at end of file diff --git a/opencloud/pkg/runtime/service/service.go b/opencloud/pkg/runtime/service/service.go index bbf92aa082..ab161e8233 100644 --- a/opencloud/pkg/runtime/service/service.go +++ b/opencloud/pkg/runtime/service/service.go @@ -2,13 +2,15 @@ package service import ( "context" + "errors" "fmt" "net" "net/http" "net/rpc" - "os" + "os/signal" "sort" "strings" + "sync" "time" "github.com/cenkalti/backoff" @@ -16,6 +18,7 @@ import ( "github.com/olekukonko/tablewriter" occfg "github.com/opencloud-eu/opencloud/pkg/config" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/shared" activitylog "github.com/opencloud-eu/opencloud/services/activitylog/pkg/command" @@ -358,8 +361,9 @@ func Start(ctx context.Context, o ...Option) error { return err } - // get a cancel function to stop the service - ctx, cancel := context.WithCancel(ctx) + // cancel the context when a signal is received. + notifyCtx, cancel := signal.NotifyContext(ctx, runner.StopSignals...) + defer cancel() // tolerance controls backoff cycles from the supervisor. tolerance := 5 @@ -397,6 +401,7 @@ func Start(ctx context.Context, o ...Option) error { if err != nil { s.Log.Fatal().Err(err).Msg("could not start listener") } + srv := new(http.Server) defer func() { if r := recover(); r != nil { @@ -404,7 +409,6 @@ func Start(ctx context.Context, o ...Option) error { if _, err = net.Dial("tcp", net.JoinHostPort(s.cfg.Runtime.Host, s.cfg.Runtime.Port)); err != nil { reason.WriteString("runtime address already in use") } - fmt.Println(reason.String()) } }() @@ -417,10 +421,7 @@ func Start(ctx context.Context, o ...Option) error { // go supervisor.Serve() // because that will briefly create a race condition as it starts up, if you try to .Add() services immediately afterward. // https://pkg.go.dev/github.com/thejerf/suture/v4@v4.0.0#Supervisor - go s.Supervisor.ServeBackground(s.context) - - // trap will block on context done channel for interruptions. - go trap(s, ctx) + go s.Supervisor.ServeBackground(s.context) // TODO Why does Supervisor uses s.context? for i, service := range s.Services { scheduleServiceTokens(s, service) @@ -434,7 +435,15 @@ func Start(ctx context.Context, o ...Option) error { // schedule services that are optional scheduleServiceTokens(s, s.Additional) - return http.Serve(l, nil) + go func() { + if err = srv.Serve(l); err != nil && !errors.Is(err, http.ErrServerClosed) { + s.Log.Fatal().Err(err).Msg("could not start rpc server") + } + }() + + // trapShutdownCtx will block on the context-done channel for interruptions. + trapShutdownCtx(s, srv, notifyCtx) + return nil } // scheduleServiceTokens adds service tokens to the service supervisor. @@ -501,20 +510,51 @@ func (s *Service) List(_ struct{}, reply *string) error { return nil } -// trap blocks on halt channel. When the runtime is interrupted it -// signals the controller to stop any supervised process. -func trap(s *Service, ctx context.Context) { +func trapShutdownCtx(s *Service, srv *http.Server, ctx context.Context) { <-ctx.Done() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + // TODO: To discuss the default timeout + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + s.Log.Error().Err(err).Msg("could not shutdown tcp listener") + return + } + s.Log.Info().Msg("tcp listener shutdown") + }() + for sName := range s.serviceToken { for i := range s.serviceToken[sName] { - if err := s.Supervisor.Remove(s.serviceToken[sName][i]); err != nil { - s.Log.Error().Err(err).Str("service", "runtime service").Msgf("terminating with signal: %v", s) - } + wg.Add(1) + go func() { + s.Log.Warn().Msgf("=== RemoveAndWait for %s", sName) + defer wg.Done() + // TODO: To discuss the default timeout + if err := s.Supervisor.RemoveAndWait(s.serviceToken[sName][i], 20*time.Second); err != nil && !errors.Is(err, suture.ErrSupervisorNotRunning) { + s.Log.Error().Err(err).Str("service", sName).Msgf("terminating with signal: %+v", s) + } + s.Log.Warn().Msgf("=== Done RemoveAndWait for %s", sName) + }() } } - s.Log.Debug().Str("service", "runtime service").Msgf("terminating with signal: %v", s) - time.Sleep(3 * time.Second) // give the services time to deregister - os.Exit(0) // FIXME this cause an early exit that prevents services from shitting down properly + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + // TODO: To discuss the default timeout + case <-time.After(30 * time.Second): + s.Log.Fatal().Msg("ocis graceful shutdown timeout reached, terminating") + case <-done: + s.Log.Info().Msg("all ocis services gracefully stopped") + return + } } // pingNats will attempt to connect to nats, blocking until a connection is established diff --git a/pkg/runner/factory.go b/pkg/runner/factory.go index e7925941e4..37767ddde4 100644 --- a/pkg/runner/factory.go +++ b/pkg/runner/factory.go @@ -9,6 +9,7 @@ import ( ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" ohttp "github.com/opencloud-eu/opencloud/pkg/service/http" + "github.com/opencloud-eu/reva/v2/cmd/revad/runtime" "google.golang.org/grpc" ) @@ -102,7 +103,8 @@ func NewGolangHttpServerRunner(name string, server *http.Server, opts ...Option) // Since Shutdown might take some time, don't block go func() { // give 5 secs for the shutdown to finish - shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + // TODO: To discuss the default timeout + shutdownCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() debugCh <- server.Shutdown(shutdownCtx) @@ -132,3 +134,23 @@ func NewGolangGrpcServerRunner(name string, server *grpc.Server, listener net.Li return r } + +func NewRevaServiceRunner(name string, server runtime.RevaDrivenServer, opts ...Option) *Runner { + httpCh := make(chan error, 1) + r := New(name, func() error { + // start the server and return if it fails + if err := server.Start(); err != nil { + return err + } + return <-httpCh // wait for the result + }, func() { + // stop implies deregistering and waiting for the request to finish, + // so don't block + go func() { + httpCh <- server.Stop() // stop and send a result through a channel + close(httpCh) + }() + }, opts...) + + return r +} diff --git a/pkg/runner/option.go b/pkg/runner/option.go index af1dca0455..649deb14ed 100644 --- a/pkg/runner/option.go +++ b/pkg/runner/option.go @@ -7,10 +7,12 @@ import ( var ( // DefaultInterruptDuration is the default value for the `WithInterruptDuration` // for the "regular" runners. This global value can be adjusted if needed. - DefaultInterruptDuration = 10 * time.Second + // TODO: To discuss the default timeout + DefaultInterruptDuration = 20 * time.Second // DefaultGroupInterruptDuration is the default value for the `WithInterruptDuration` // for the group runners. This global value can be adjusted if needed. - DefaultGroupInterruptDuration = 15 * time.Second + // TODO: To discuss the default timeout + DefaultGroupInterruptDuration = 25 * time.Second ) // Option defines a single option function. diff --git a/services/app-provider/pkg/command/server.go b/services/app-provider/pkg/command/server.go index 16570e3e8b..0790f3b42d 100644 --- a/services/app-provider/pkg/command/server.go +++ b/services/app-provider/pkg/command/server.go @@ -4,15 +4,16 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/cmd/revad/runtime" "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/app-provider/pkg/config" @@ -37,66 +38,58 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() - gr.Add(func() error { + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.AppProviderConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("app-provider_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("app-provider_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/app-registry/pkg/command/server.go b/services/app-registry/pkg/command/server.go index 184ee32281..aa6e2c91f7 100644 --- a/services/app-registry/pkg/command/server.go +++ b/services/app-registry/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/app-registry/pkg/config" @@ -36,67 +37,59 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() + + { - gr.Add(func() error { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.AppRegistryConfigFromStruct(cfg, logger) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("app-registry_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("app-registry_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/auth-app/pkg/command/server.go b/services/auth-app/pkg/command/server.go index 91fbebdb9b..51390cd964 100644 --- a/services/auth-app/pkg/command/server.go +++ b/services/auth-app/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -44,60 +45,43 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() + { - gr.Add(func() error { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.AuthAppConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) + gr.Add(runner.NewRevaServiceRunner("auth-app_revad", revaSrv)) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("auth-app_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { @@ -128,24 +112,32 @@ func Server(cfg *config.Config) *cli.Command { return err } - rClient := settingssvc.NewRoleService("eu.opencloud.api.settings", grpcClient) - server, err := http.Server( - http.Logger(logger), - http.Context(ctx), - http.Config(cfg), - http.GatewaySelector(gatewaySelector), - http.RoleClient(rClient), - http.TracerProvider(traceProvider), - ) - if err != nil { - logger.Fatal().Err(err).Msg("failed to initialize http server") + { + rClient := settingssvc.NewRoleService("eu.opencloud.api.settings", grpcClient) + server, err := http.Server( + http.Logger(logger), + http.Context(ctx), + http.Config(cfg), + http.GatewaySelector(gatewaySelector), + http.RoleClient(rClient), + http.TracerProvider(traceProvider), + ) + if err != nil { + logger.Fatal().Err(err).Msg("failed to initialize http server") + } + + gr.Add(runner.NewGoMicroHttpServerRunner("auth-app_http", server)) } - gr.Add(server.Run, func(err error) { - logger.Error().Err(err).Str("server", "http").Msg("shutting down server") - }) + grResults := gr.Run(ctx) - return gr.Run() + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/auth-basic/pkg/command/server.go b/services/auth-basic/pkg/command/server.go index 3ae1dffdf8..aa86c85395 100644 --- a/services/auth-basic/pkg/command/server.go +++ b/services/auth-basic/pkg/command/server.go @@ -4,13 +4,14 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/ldap" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/auth-basic/pkg/config" @@ -37,10 +38,15 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + + gr := runner.NewGroup() // the reva runtime calls `os.Exit` in the case of a failure and there is no way for the OpenCloud // runtime to catch it and restart a reva service. Therefore, we need to ensure the service has @@ -54,62 +60,47 @@ func Server(cfg *config.Config) *cli.Command { } } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) - - gr.Add(func() error { + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.AuthBasicConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("auth-basic_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("auth-basic_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/auth-bearer/pkg/command/server.go b/services/auth-bearer/pkg/command/server.go index 84c34183ab..96d4028c6e 100644 --- a/services/auth-bearer/pkg/command/server.go +++ b/services/auth-bearer/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/auth-bearer/pkg/config" @@ -36,67 +37,57 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() - gr.Add(func() error { + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.AuthBearerConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("auth-bearer_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("auth-bearer_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/auth-machine/pkg/command/server.go b/services/auth-machine/pkg/command/server.go index b182c24b44..0b3ce11d0a 100644 --- a/services/auth-machine/pkg/command/server.go +++ b/services/auth-machine/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/auth-machine/pkg/config" @@ -36,67 +37,57 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() - gr.Add(func() error { + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.AuthMachineConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("auth-machine_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("auth-machine_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/auth-service/pkg/command/server.go b/services/auth-service/pkg/command/server.go index 8f70dcdee6..90f24d83e7 100644 --- a/services/auth-service/pkg/command/server.go +++ b/services/auth-service/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/auth-service/pkg/config" @@ -36,68 +37,57 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") - reg := registry.GetRegistry() + gr := runner.NewGroup() - rcfg := revaconfig.AuthMachineConfigFromStruct(cfg) + { + pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") + reg := registry.GetRegistry() + rcfg := revaconfig.AuthMachineConfigFromStruct(cfg) - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) - - gr.Add(func() error { - runtime.RunWithOptions(rcfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rcfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("auth-service_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("auth-service_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/frontend/pkg/command/server.go b/services/frontend/pkg/command/server.go index bc78570519..ea0da1b7a7 100644 --- a/services/frontend/pkg/command/server.go +++ b/services/frontend/pkg/command/server.go @@ -4,15 +4,16 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/cmd/revad/runtime" "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/frontend/pkg/config" @@ -37,64 +38,45 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() - - rCfg, err := revaconfig.FrontendConfigFromStruct(cfg, logger) - if err != nil { - return err + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() - gr.Add(func() error { + { + rCfg, err := revaconfig.FrontendConfigFromStruct(cfg, logger) + if err != nil { + return err + } pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("frontend_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("frontend_debug", debugServer)) + } httpSvc := registry.BuildHTTPService(cfg.HTTP.Namespace+"."+cfg.Service.Name, cfg.HTTP.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, httpSvc, cfg.Debug.Addr); err != nil { @@ -102,13 +84,23 @@ func Server(cfg *config.Config) *cli.Command { } // add event handler - gr.Add(func() error { - return ListenForEvents(ctx, cfg, logger) - }, func(_ error) { - cancel() - }) + gr.Add(runner.New("frontend_event", + func() error { + return ListenForEvents(ctx, cfg, logger) + }, func() { + logger.Info().Msg("stopping event handler") + }, + )) - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/gateway/pkg/command/server.go b/services/gateway/pkg/command/server.go index 20b5ed752a..e131a12749 100644 --- a/services/gateway/pkg/command/server.go +++ b/services/gateway/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/gateway/pkg/config" @@ -36,69 +37,58 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() - gr.Add(func() error { + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.GatewayConfigFromStruct(cfg, logger) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - logger.Info(). - Str("server", cfg.Service.Name). - Msg("reva runtime exited") - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("gateway_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("gateway_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/groups/pkg/command/server.go b/services/groups/pkg/command/server.go index 6371e1b146..2282e786e8 100644 --- a/services/groups/pkg/command/server.go +++ b/services/groups/pkg/command/server.go @@ -4,13 +4,14 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/ldap" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/groups/pkg/config" @@ -37,10 +38,6 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - - defer cancel() // the reva runtime calls os.Exit in the case of a failure and there is no way for the OpenCloud // runtime to catch it and restart a reva service. Therefore we need to ensure the service has @@ -54,62 +51,57 @@ func Server(cfg *config.Config) *cli.Command { } } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - gr.Add(func() error { + gr := runner.NewGroup() + + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.GroupsConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("groups_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("groups_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/ocm/pkg/command/server.go b/services/ocm/pkg/command/server.go index 900be491ba..f0f955eea4 100644 --- a/services/ocm/pkg/command/server.go +++ b/services/ocm/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/ocm/pkg/config" @@ -36,61 +37,43 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - rCfg := revaconfig.OCMConfigFromStruct(cfg, logger) + gr := runner.NewGroup() - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) - - gr.Add(func() error { + { + rCfg := revaconfig.OCMConfigFromStruct(cfg, logger) pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("ocm_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("ocm_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { @@ -102,7 +85,15 @@ func Server(cfg *config.Config) *cli.Command { logger.Fatal().Err(err).Msg("failed to register the http service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/settings/pkg/command/server.go b/services/settings/pkg/command/server.go index 4ae0942c64..67bd695378 100644 --- a/services/settings/pkg/command/server.go +++ b/services/settings/pkg/command/server.go @@ -55,7 +55,7 @@ func Server(cfg *config.Config) *cli.Command { handle := svc.NewDefaultLanguageService(cfg, svc.NewService(cfg, logger)) - servers := runner.NewGroup() + gr := runner.NewGroup() // prepare an HTTP server and add it to the group run. httpServer, err := http.Server( @@ -73,7 +73,7 @@ func Server(cfg *config.Config) *cli.Command { Msg("Error initializing http service") return fmt.Errorf("could not initialize http service: %w", err) } - servers.Add(runner.NewGoMicroHttpServerRunner("settings_http", httpServer)) + gr.Add(runner.NewGoMicroHttpServerRunner("settings_http", httpServer)) // prepare a gRPC server and add it to the group run. grpcServer := grpc.Server( @@ -85,7 +85,7 @@ func Server(cfg *config.Config) *cli.Command { grpc.ServiceHandler(handle), grpc.TraceProvider(traceProvider), ) - servers.Add(runner.NewGoMicroGrpcServerRunner("settings_grpc", grpcServer)) + gr.Add(runner.NewGoMicroGrpcServerRunner("settings_grpc", grpcServer)) // prepare a debug server and add it to the group run. debugServer, err := debug.Server( @@ -98,9 +98,9 @@ func Server(cfg *config.Config) *cli.Command { return err } - servers.Add(runner.NewGolangHttpServerRunner("settings_debug", debugServer)) + gr.Add(runner.NewGolangHttpServerRunner("settings_debug", debugServer)) - grResults := servers.Run(ctx) + grResults := gr.Run(ctx) // return the first non-nil error found in the results for _, grResult := range grResults { diff --git a/services/sharing/pkg/command/server.go b/services/sharing/pkg/command/server.go index 0193406a56..6b29f4ace5 100644 --- a/services/sharing/pkg/command/server.go +++ b/services/sharing/pkg/command/server.go @@ -4,13 +4,14 @@ import ( "context" "fmt" "os" + "os/signal" "path" "path/filepath" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/sharing/pkg/config" @@ -37,10 +38,6 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - - defer cancel() // precreate folders if cfg.UserSharingDriver == "json" && cfg.UserSharingDrivers.JSON.File != "" { @@ -54,14 +51,16 @@ func Server(cfg *config.Config) *cli.Command { } } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - gr.Add(func() error { + gr := runner.NewGroup() + + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg, err := revaconfig.SharingConfigFromStruct(cfg, logger) if err != nil { @@ -69,50 +68,43 @@ func Server(cfg *config.Config) *cli.Command { } reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("sharing_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("sharing_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/storage-publiclink/pkg/command/server.go b/services/storage-publiclink/pkg/command/server.go index 95ca4a4f21..50b937dbf0 100644 --- a/services/storage-publiclink/pkg/command/server.go +++ b/services/storage-publiclink/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/storage-publiclink/pkg/config" @@ -36,67 +37,58 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() - gr.Add(func() error { + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.StoragePublicLinkConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("storage-publiclink_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("storage-publiclink_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/storage-shares/pkg/command/server.go b/services/storage-shares/pkg/command/server.go index a0ccf04829..1abce31c41 100644 --- a/services/storage-shares/pkg/command/server.go +++ b/services/storage-shares/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/storage-shares/pkg/config" @@ -36,67 +37,58 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() - gr.Add(func() error { + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.StorageSharesConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("storage-shares_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("storage-shares_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/storage-system/pkg/command/server.go b/services/storage-system/pkg/command/server.go index 7b98cdabb0..f8882b841c 100644 --- a/services/storage-system/pkg/command/server.go +++ b/services/storage-system/pkg/command/server.go @@ -4,15 +4,16 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/cmd/revad/runtime" "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/storage-system/pkg/config" @@ -37,60 +38,43 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() - gr.Add(func() error { + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.StorageSystemFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("storage-system_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("storage-system_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { @@ -102,7 +86,15 @@ func Server(cfg *config.Config) *cli.Command { logger.Fatal().Err(err).Msg("failed to register the http service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/storage-users/pkg/command/server.go b/services/storage-users/pkg/command/server.go index 9e1d881c17..338e0f6d76 100644 --- a/services/storage-users/pkg/command/server.go +++ b/services/storage-users/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/storage-users/pkg/config" @@ -38,60 +39,42 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + gr := runner.NewGroup() - gr.Add(func() error { + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.StorageUsersConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("storage-users_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(err error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("storage-users_debug", debugServer)) + } grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) if err := registry.RegisterService(ctx, logger, grpcSvc, cfg.Debug.Addr); err != nil { @@ -113,25 +96,19 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { logger.Fatal().Err(err).Msg("can't create event handler") } - - gr.Add(eventSVC.Run, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "stream"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "stream"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + // The event service Run() function handles the stop signal itself + go eventSVC.Run() } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/users/pkg/command/server.go b/services/users/pkg/command/server.go index fc26b68501..e215b5e385 100644 --- a/services/users/pkg/command/server.go +++ b/services/users/pkg/command/server.go @@ -4,13 +4,14 @@ import ( "context" "fmt" "os" + "os/signal" "path" "github.com/gofrs/uuid" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/ldap" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/users/pkg/config" @@ -37,10 +38,6 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - - defer cancel() // the reva runtime calls os.Exit in the case of a failure and there is no way for the OpenCloud // runtime to catch it and restart a reva service. Therefore we need to ensure the service has @@ -54,55 +51,41 @@ func Server(cfg *config.Config) *cli.Command { } } - // make sure the run group executes all interrupt handlers when the context is canceled - gr.Add(func() error { - <-ctx.Done() - return nil - }, func(_ error) { - }) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - gr.Add(func() error { + gr := runner.NewGroup() + + { pidFile := path.Join(os.TempDir(), "revad-"+cfg.Service.Name+"-"+uuid.Must(uuid.NewV4()).String()+".pid") rCfg := revaconfig.UsersConfigFromStruct(cfg) reg := registry.GetRegistry() - runtime.RunWithOptions(rCfg, pidFile, + revaSrv := runtime.RunDrivenServerWithOptions(rCfg, pidFile, runtime.WithLogger(&logger.Logger), runtime.WithRegistry(reg), runtime.WithTraceProvider(traceProvider), ) - - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "reva"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) - - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") - return err + gr.Add(runner.NewRevaServiceRunner("users_revad", revaSrv)) } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Info().Err(err).Str("server", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner("users_debug", debugServer)) + } // FIXME we should defer registering the service until we are sure that reva is running grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Protocol, cfg.GRPC.Addr, version.GetString()) @@ -110,7 +93,15 @@ func Server(cfg *config.Config) *cli.Command { logger.Fatal().Err(err).Msg("failed to register the grpc service") } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace/grace.go b/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace/grace.go index afdc6da6b7..0366d05558 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace/grace.go +++ b/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace/grace.go @@ -26,6 +26,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "syscall" "time" @@ -88,13 +89,18 @@ func NewWatcher(opts ...Option) *Watcher { // Exit exits the current process cleaning up // existing pid files. func (w *Watcher) Exit(errc int) { + w.Clean() + os.Exit(errc) +} + +// Clean removes the pid file. +func (w *Watcher) Clean() { err := w.clean() if err != nil { w.log.Warn().Err(err).Msg("error removing pid file") } else { w.log.Info().Msgf("pid file %q got removed", w.pidFile) } - os.Exit(errc) } func (w *Watcher) clean() error { @@ -266,7 +272,7 @@ type Server interface { // TrapSignals captures the OS signal. func (w *Watcher) TrapSignals() { signalCh := make(chan os.Signal, 1024) - signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT) + signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) for { s := <-signalCh w.log.Info().Msgf("%v signal received", s) @@ -284,14 +290,9 @@ func (w *Watcher) TrapSignals() { w.log.Info().Msgf("child forked with new pid %d", p.Pid) w.childPIDs = append(w.childPIDs, p.Pid) } - - case syscall.SIGQUIT: - gracefulShutdown(w) - case syscall.SIGINT, syscall.SIGTERM: - if w.gracefulShutdownTimeout == 0 { - hardShutdown(w) - } + case syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM: gracefulShutdown(w) + return } } } @@ -300,38 +301,43 @@ func (w *Watcher) TrapSignals() { // exit() is problematic (i.e. racey) especiaily when orchestrating multiple // reva services from some external runtime (like in the "opencloud server" case func gracefulShutdown(w *Watcher) { + defer w.Clean() w.log.Info().Int("Timeout", w.gracefulShutdownTimeout).Msg("preparing for a graceful shutdown with deadline") + wg := sync.WaitGroup{} + + for _, s := range w.ss { + wg.Add(1) + go func() { + defer wg.Done() + w.log.Info().Msgf("fd to %s:%s gracefully closed", s.Network(), s.Address()) + err := s.GracefulStop() + if err != nil { + w.log.Error().Err(err).Msg("error stopping server") + } + }() + } + + done := make(chan struct{}) go func() { - count := w.gracefulShutdownTimeout - ticker := time.NewTicker(time.Second) - for ; true; <-ticker.C { - w.log.Info().Msgf("shutting down in %d seconds", count-1) - count-- - if count <= 0 { - w.log.Info().Msg("deadline reached before draining active conns, hard stopping ...") - for _, s := range w.ss { - err := s.Stop() - if err != nil { - w.log.Error().Err(err).Msg("error stopping server") - } - w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address()) - } - w.Exit(1) + wg.Wait() + close(done) + }() + + select { + case <-time.After(time.Duration(w.gracefulShutdownTimeout) * time.Second): + w.log.Info().Msg("graceful shutdown timeout reached. running hard shutdown") + for _, s := range w.ss { + w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address()) + err := s.Stop() + if err != nil { + w.log.Error().Err(err).Msg("error stopping server") } } - }() - for _, s := range w.ss { - w.log.Info().Msgf("fd to %s:%s gracefully closed ", s.Network(), s.Address()) - err := s.GracefulStop() - if err != nil { - w.log.Error().Err(err).Msg("error stopping server") - w.log.Info().Msg("exit with error code 1") - - w.Exit(1) - } + return + case <-done: + w.log.Info().Msg("all servers gracefully stopped") + return } - w.log.Info().Msg("exit with error code 0") - w.Exit(0) } // TODO: Ideally this would call exit() but properly return an error. The diff --git a/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/runtime/drivenserver.go b/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/runtime/drivenserver.go new file mode 100644 index 0000000000..b9eb819ece --- /dev/null +++ b/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/runtime/drivenserver.go @@ -0,0 +1,178 @@ +package runtime + +import ( + "errors" + "fmt" + "net" + "net/http" + "os" + "time" + + "github.com/opencloud-eu/reva/v2/pkg/registry" + "github.com/rs/zerolog" +) + +const ( + HTTP = iota + GRPC +) + +// RevaDrivenServer is an interface that defines the methods for starting and stopping reva HTTP/GRPC services. +type RevaDrivenServer interface { + Start() error + Stop() error +} + +// revaServer is an interface that defines the methods for starting and stopping a reva server. +type revaServer interface { + Start(ln net.Listener) error + Stop() error + GracefulStop() error + Network() string + Address() string +} + +// sever represents a generic reva server that implements the RevaDrivenServer interface. +type server struct { + srv revaServer + log *zerolog.Logger + gracefulShutdownTimeout time.Duration + protocol string +} + +// NewDrivenHTTPServerWithOptions runs a revad server w/o watcher with the given config file and options. +// Use it in cases where you want to run a revad server without the need for a watcher and the os signal handling as a part of another runtime. +// Returns nil if no http server is configured in the config file. +// The GracefulShutdownTimeout set to default 20 seconds and can be overridden in the core config. +// Logging a fatal error and exit with code 1 if the http server cannot be created. +func NewDrivenHTTPServerWithOptions(mainConf map[string]interface{}, opts ...Option) RevaDrivenServer { + if !isEnabledHTTP(mainConf) { + return nil + } + options := newOptions(opts...) + if srv := newServer(HTTP, mainConf, options); srv != nil { + return srv + } + options.Logger.Fatal().Msg("nothing to do, no http enabled_services declared in config") + return nil +} + +// NewDrivenGRPCServerWithOptions runs a revad server w/o watcher with the given config file and options. +// Use it in cases where you want to run a revad server without the need for a watcher and the os signal handling as a part of another runtime. +// Returns nil if no grpc server is configured in the config file. +// The GracefulShutdownTimeout set to default 20 seconds and can be overridden in the core config. +// Logging a fatal error and exit with code 1 if the grpc server cannot be created. +func NewDrivenGRPCServerWithOptions(mainConf map[string]interface{}, opts ...Option) RevaDrivenServer { + if !isEnabledGRPC(mainConf) { + return nil + } + options := newOptions(opts...) + if srv := newServer(GRPC, mainConf, options); srv != nil { + return srv + } + options.Logger.Fatal().Msg("nothing to do, no grpc enabled_services declared in config") + return nil +} + +// Start starts the reva server, listening on the configured address and network. +func (s *server) Start() error { + if s.srv == nil { + err := fmt.Errorf("reva %s server not initialized", s.protocol) + s.log.Fatal().Err(err).Send() + return err + } + ln, err := net.Listen(s.srv.Network(), s.srv.Address()) + if err != nil { + s.log.Fatal().Err(err).Send() + return err + } + if err = s.srv.Start(ln); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + s.log.Error().Err(err).Msgf("reva %s server error", s.protocol) + } + return err + } + return nil +} + +// Stop gracefully stops the reva server, waiting for the graceful shutdown timeout. +func (s *server) Stop() error { + if s.srv == nil { + return nil + } + done := make(chan struct{}) + go func() { + s.log.Info().Msgf("gracefully stopping %s:%s reva %s server", s.srv.Network(), s.srv.Address(), s.protocol) + if err := s.srv.GracefulStop(); err != nil { + s.log.Error().Err(err).Msgf("error gracefully stopping reva %s server", s.protocol) + s.srv.Stop() + } + close(done) + }() + + select { + case <-time.After(s.gracefulShutdownTimeout): + s.log.Info().Msg("graceful shutdown timeout reached. running hard shutdown") + err := s.srv.Stop() + if err != nil { + s.log.Error().Err(err).Msgf("error stopping reva %s server", s.protocol) + } + return nil + case <-done: + s.log.Info().Msgf("reva %s server gracefully stopped", s.protocol) + return nil + } +} + +// newServer runs a revad server w/o watcher with the given config file and options. +func newServer(protocol int, mainConf map[string]interface{}, options Options) RevaDrivenServer { + parseSharedConfOrDie(mainConf["shared"]) + coreConf := parseCoreConfOrDie(mainConf["core"]) + log := options.Logger + + if err := registry.Init(options.Registry); err != nil { + log.Fatal().Err(err).Msg("failed to initialize registry client") + return nil + } + + host, _ := os.Hostname() + log.Info().Msgf("host info: %s", host) + + // Only initialize tracing if we didn't get a tracer provider. + if options.TraceProvider == nil { + log.Debug().Msg("no pre-existing tracer given, initializing tracing") + options.TraceProvider = initTracing(coreConf) + } + initCPUCount(coreConf, log) + + gracefulShutdownTimeout := 20 * time.Second + if coreConf.GracefulShutdownTimeout > 0 { + gracefulShutdownTimeout = time.Duration(coreConf.GracefulShutdownTimeout) * time.Second + } + + srv := &server{ + log: options.Logger, + gracefulShutdownTimeout: gracefulShutdownTimeout, + } + switch protocol { + case HTTP: + s, err := getHTTPServer(mainConf["http"], options.Logger, options.TraceProvider) + if err != nil { + options.Logger.Fatal().Err(err).Msg("error creating http server") + return nil + } + srv.srv = s + srv.protocol = "http" + return srv + case GRPC: + s, err := getGRPCServer(mainConf["grpc"], options.Logger, options.TraceProvider) + if err != nil { + options.Logger.Fatal().Err(err).Msg("error creating grpc server") + return nil + } + srv.srv = s + srv.protocol = "grpc" + return srv + } + return nil +} diff --git a/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/runtime/runtime.go b/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/runtime/runtime.go index 4e7dc3d73f..2ae269195a 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/runtime/runtime.go +++ b/vendor/github.com/opencloud-eu/reva/v2/cmd/revad/runtime/runtime.go @@ -53,7 +53,8 @@ func RunWithOptions(mainConf map[string]interface{}, pidFile string, opts ...Opt coreConf := parseCoreConfOrDie(mainConf["core"]) if err := registry.Init(options.Registry); err != nil { - panic(err) + options.Logger.Fatal().Err(err).Msg("failed to initialize registry client") + return } run(mainConf, coreConf, options.Logger, options.TraceProvider, pidFile) diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/rgrpc/rgrpc.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/rgrpc/rgrpc.go index 8adb98afe4..43e4821d35 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/rgrpc/rgrpc.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/rgrpc/rgrpc.go @@ -279,15 +279,15 @@ func (s *Server) cleanupServices() { // Stop stops the server. func (s *Server) Stop() error { - s.cleanupServices() s.s.Stop() + s.cleanupServices() return nil } // GracefulStop gracefully stops the server. func (s *Server) GracefulStop() error { - s.cleanupServices() s.s.GracefulStop() + s.cleanupServices() return nil } diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/rhttp/rhttp.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/rhttp/rhttp.go index 3d6919b509..6636280b60 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/rhttp/rhttp.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/rhttp/rhttp.go @@ -132,10 +132,10 @@ func (s *Server) Start(ln net.Listener) error { // Stop stops the server. func (s *Server) Stop() error { - s.closeServices() // TODO(labkode): set ctx deadline to zero ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() + defer s.closeServices() return s.httpServer.Shutdown(ctx) } @@ -164,7 +164,7 @@ func (s *Server) Address() string { // GracefulStop gracefully stops the server. func (s *Server) GracefulStop() error { - s.closeServices() + defer s.closeServices() return s.httpServer.Shutdown(context.Background()) }