feat: use runners to startup the services

This commit is contained in:
Juan Pablo Villafáñez
2024-05-02 14:10:32 +02:00
committed by Jörn Friedrich Dreyer
parent 61796a2b0b
commit 9e1b80a1be
33 changed files with 997 additions and 830 deletions

View File

@@ -4,12 +4,13 @@ import (
"context"
"fmt"
"os"
"os/signal"
"github.com/oklog/run"
"github.com/opencloud-eu/reva/v2/pkg/store"
"github.com/urfave/cli/v2"
microstore "go-micro.dev/v4/store"
"github.com/opencloud-eu/opencloud/pkg/runner"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config/parser"
@@ -33,18 +34,21 @@ func Server(cfg *config.Config) *cli.Command {
return err
},
Action: func(c *cli.Context) error {
var (
gr = run.Group{}
logger = logging.Configure(cfg.Service.Name, cfg.Log)
ctx, cancel = context.WithCancel(c.Context)
)
defer cancel()
logger := logging.Configure(cfg.Service.Name, cfg.Log)
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}
traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
if err != nil {
return err
}
gr := runner.NewGroup()
{
st := store.Create(
store.Store(cfg.Store.Store),
@@ -59,30 +63,12 @@ func Server(cfg *config.Config) *cli.Command {
if err != nil {
return err
}
gr.Add(func() error {
err := make(chan error, 1)
select {
case <-ctx.Done():
return nil
case err <- svc.Run():
return <-err
}
}, func(err error) {
if err != nil {
logger.Info().
Str("transport", "stream").
Str("server", cfg.Service.Name).
Msg("Shutting down server")
} else {
logger.Error().Err(err).
Str("transport", "stream").
Str("server", cfg.Service.Name).
Msg("Shutting down server")
}
cancel()
})
gr.Add(runner.New("postprocessing_svc", func() error {
return svc.Run()
}, func() {
svc.Close()
}))
}
{
@@ -96,12 +82,18 @@ func Server(cfg *config.Config) *cli.Command {
return err
}
gr.Add(debugServer.ListenAndServe, func(_ error) {
_ = debugServer.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("postprocessing_debug", debugServer))
}
return gr.Run()
grResults := gr.Run(ctx)
// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/opencloud-eu/opencloud/pkg/generators"
@@ -34,6 +35,8 @@ type PostprocessingService struct {
c config.Postprocessing
tp trace.TracerProvider
metrics *metrics.Metrics
stopCh chan struct{}
stopped atomic.Bool
}
var (
@@ -97,6 +100,7 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.
c: cfg.Postprocessing,
tp: tp,
metrics: m,
stopCh: make(chan struct{}, 1),
}, nil
}
@@ -108,26 +112,66 @@ func (pps *PostprocessingService) Run() error {
wg.Add(1)
go func() {
defer wg.Done()
for e := range pps.events {
if err := pps.processEvent(e); err != nil {
switch {
case errors.Is(err, ErrFatal):
pps.log.Fatal().Err(err).Msg("fatal error - exiting")
case errors.Is(err, ErrEvent):
pps.log.Error().Err(err).Msg("continuing")
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
EventLoop:
for {
select {
case <-pps.stopCh:
// stop requested
// TODO: we might need a way to unsubscribe from the event channel, otherwise
// we'll be leaking a goroutine in reva that will be stuck waiting for
// someone to read from the event channel.
// Note: redis implementation seems to have a timeout, so the goroutine
// will exit if there is nobody processing the events and the timeout
// is reached. The behavior is unclear with natsjs
break EventLoop
case e, ok := <-pps.events:
if !ok {
// event channel is closed, so nothing more to do
break EventLoop
}
err := pps.processEvent(e)
if err != nil {
switch {
case errors.Is(err, ErrFatal):
pps.log.Fatal().Err(err).Msg("fatal error - exiting")
case errors.Is(err, ErrEvent):
pps.log.Error().Err(err).Msg("continuing")
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
}
}
if pps.stopped.Load() {
// if stopped, don't process any more events
break EventLoop
}
}
}
}()
}
wg.Wait()
return nil
}
// Close will make the postprocessing service to stop processing, so the `Run`
// method can finish.
// TODO: Underlying services can't be stopped. This means that some goroutines
// will get stuck trying to push events through a channel nobody is reading
// from, so resources won't be freed and there will be memory leaks. For now,
// if the service is stopped, you should close the app soon after.
func (pps *PostprocessingService) Close() {
if pps.stopped.CompareAndSwap(false, true) {
close(pps.stopCh)
}
}
func (pps *PostprocessingService) processEvent(e raw.Event) error {
pps.log.Debug().Str("Type", e.Type).Str("ID", e.ID).Msg("processing event received")
var (
next interface{}
pp *postprocessing.Postprocessing