mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2025-12-23 22:29:59 -05:00
263 lines
8.7 KiB
Go
263 lines
8.7 KiB
Go
package runner
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// GroupRunner represent a group of tasks that need to run together.
|
|
// The expectation is that all the tasks will run at the same time, and when
|
|
// one of them stops, the rest will also stop.
|
|
//
|
|
// The GroupRunner is intended to be used to run multiple services, which are
|
|
// more or less independent from eachother, but at the same time it doesn't
|
|
// make sense to have any of them stopped while the rest are running.
|
|
// Basically, either all of them run, or none of them.
|
|
// For example, you can have a GRPC and HTTP servers running, each of them
|
|
// providing a piece of functionality, however, if any of them fails, the
|
|
// feature provided by them would be incomplete or broken.
|
|
//
|
|
// The interrupt duration for the group can be set through the
|
|
// `WithInterruptDuration` option. If the option isn't supplied, the default
|
|
// value `DefaultGroupInterruptDuration` will be used.
|
|
//
|
|
// It's recommended that the timeouts are handled by each runner individually,
|
|
// meaning that each runner's timeout should be less than the group runner's
|
|
// timeout. This way, we can know which runner timed out.
|
|
// If the group timeout is reached, the remaining results will have the
|
|
// runner's id as "_unknown_".
|
|
//
|
|
// Note that, as services, the task aren't expected to stop by default.
|
|
// This means that, if a task finishes naturally, the rest of the task will
|
|
// asked to stop as well.
|
|
type GroupRunner struct {
|
|
runners sync.Map
|
|
runnersCount int
|
|
isRunning bool
|
|
interruptDur time.Duration
|
|
interrupted atomic.Bool
|
|
interruptedCh chan time.Duration
|
|
runningMutex sync.Mutex
|
|
}
|
|
|
|
// NewGroup will create a GroupRunner
|
|
func NewGroup(opts ...Option) *GroupRunner {
|
|
options := Options{
|
|
InterruptDuration: DefaultGroupInterruptDuration,
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
return &GroupRunner{
|
|
runners: sync.Map{},
|
|
runningMutex: sync.Mutex{},
|
|
interruptDur: options.InterruptDuration,
|
|
interruptedCh: make(chan time.Duration, 1),
|
|
}
|
|
}
|
|
|
|
// Add will add a runner to the group.
|
|
//
|
|
// It's mandatory that each runner in the group has an unique id, otherwise
|
|
// there will be issues
|
|
// Adding new runners once the group starts will cause a panic
|
|
func (gr *GroupRunner) Add(r *Runner) {
|
|
gr.runningMutex.Lock()
|
|
defer gr.runningMutex.Unlock()
|
|
|
|
if gr.isRunning {
|
|
panic("Adding a new runner after the group starts is forbidden")
|
|
}
|
|
|
|
// LoadOrStore will try to store the runner
|
|
if _, loaded := gr.runners.LoadOrStore(r.ID, r); loaded {
|
|
// there is already a runner with the same id, which is forbidden
|
|
panic("Trying to add a runner with an existing Id in the group")
|
|
}
|
|
// Only increase the count if a runner is stored.
|
|
// Currently panicking if the runner exists and is loaded
|
|
gr.runnersCount++
|
|
}
|
|
|
|
// Run will execute all the tasks in the group at the same time.
|
|
//
|
|
// Similarly to the "regular" runner's `Run` method, the execution thread
|
|
// will be blocked here until all tasks are completed, and their results
|
|
// will be available (each result will have the runner's id so it's easy to
|
|
// find which one failed). Note that there is no guarantee about the result's
|
|
// order, so the first result in the slice might or might not be the first
|
|
// result to be obtained.
|
|
//
|
|
// When the context is marked as done, the groupRunner will call all the
|
|
// stoppers for each runner to notify each task to stop. Note that the tasks
|
|
// might still take a while to complete.
|
|
//
|
|
// If a task finishes naturally (with the context still "alive"), it will also
|
|
// cause the groupRunner to call the stoppers of the rest of the tasks. So if
|
|
// a task finishes, the rest will also finish.
|
|
// Note that it is NOT expected for the finished task's stopper to be called
|
|
// in this case.
|
|
func (gr *GroupRunner) Run(ctx context.Context) []*Result {
|
|
// Set the flag inside the runningMutex to ensure we don't read the old value
|
|
// in the `Add` method and add a new runner when this method is being executed
|
|
// Note that if multiple `Run` or `RunAsync` happens, the underlying runners
|
|
// will panic
|
|
gr.runningMutex.Lock()
|
|
gr.isRunning = true
|
|
gr.runningMutex.Unlock()
|
|
|
|
results := make([]*Result, 0, gr.runnersCount)
|
|
|
|
ch := make(chan *Result, gr.runnersCount) // no need to block writing results
|
|
gr.runners.Range(func(_, value any) bool {
|
|
r := value.(*Runner)
|
|
r.RunAsync(ch)
|
|
return true
|
|
})
|
|
|
|
var d time.Duration
|
|
// wait for a result or for the context to be done
|
|
select {
|
|
case result := <-ch:
|
|
results = append(results, result)
|
|
case d = <-gr.interruptedCh:
|
|
results = append(results, &Result{
|
|
RunnerID: "_unknown_",
|
|
RunnerError: NewGroupTimeoutError(d),
|
|
})
|
|
case <-ctx.Done():
|
|
// Do nothing
|
|
}
|
|
|
|
// interrupt the rest of the runners
|
|
gr.Interrupt()
|
|
|
|
// Having notified that the context has been finished, we still need to
|
|
// wait for the rest of the results
|
|
for i := len(results); i < gr.runnersCount; i++ {
|
|
select {
|
|
case result := <-ch:
|
|
results = append(results, result)
|
|
case d2, ok := <-gr.interruptedCh:
|
|
if ok {
|
|
d = d2
|
|
}
|
|
results = append(results, &Result{
|
|
RunnerID: "_unknown_",
|
|
RunnerError: NewGroupTimeoutError(d),
|
|
})
|
|
}
|
|
}
|
|
|
|
// Even if we reach the group time out and bail out early, tasks might
|
|
// be running and eventually deliver the result through the channel.
|
|
// We'll rely on the buffered channel so the tasks won't block and the
|
|
// data can be eventually garbage-collected along with the unused
|
|
// channel, so we won't close the channel here.
|
|
return results
|
|
}
|
|
|
|
// RunAsync will execute the tasks in the group asynchronously.
|
|
// The result of each task will be placed in the provided channel as soon
|
|
// as it's available.
|
|
// Note that this method will finish as soon as all the tasks are running.
|
|
func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
|
|
// Set the flag inside the runningMutex to ensure we don't read the old value
|
|
// in the `Add` method and add a new runner when this method is being executed
|
|
// Note that if multiple `Run` or `RunAsync` happens, the underlying runners
|
|
// will panic
|
|
gr.runningMutex.Lock()
|
|
gr.isRunning = true
|
|
gr.runningMutex.Unlock()
|
|
|
|
// we need a secondary channel to receive the first result so we can
|
|
// interrupt the rest of the tasks
|
|
interCh := make(chan *Result, gr.runnersCount)
|
|
gr.runners.Range(func(_, value any) bool {
|
|
r := value.(*Runner)
|
|
r.RunAsync(interCh)
|
|
return true
|
|
})
|
|
|
|
go func() {
|
|
var result *Result
|
|
var d time.Duration
|
|
|
|
select {
|
|
case result = <-interCh:
|
|
// result already assigned, so do nothing
|
|
case d = <-gr.interruptedCh:
|
|
// we aren't tracking which runners have finished and which are still
|
|
// running, so we'll use "_unknown_" as runner id
|
|
result = &Result{
|
|
RunnerID: "_unknown_",
|
|
RunnerError: NewGroupTimeoutError(d),
|
|
}
|
|
}
|
|
gr.Interrupt()
|
|
|
|
ch <- result
|
|
for i := 1; i < gr.runnersCount; i++ {
|
|
select {
|
|
case result = <-interCh:
|
|
// result already assigned, so do nothing
|
|
case d2, ok := <-gr.interruptedCh:
|
|
// if ok is true, d2 will have a good value; if false, the channel
|
|
// is closed and we get a default value
|
|
if ok {
|
|
d = d2
|
|
}
|
|
result = &Result{
|
|
RunnerID: "_unknown_",
|
|
RunnerError: NewGroupTimeoutError(d),
|
|
}
|
|
}
|
|
ch <- result
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Interrupt will execute the stopper function of ALL the tasks, which should
|
|
// notify the tasks in order for them to finish.
|
|
// The stoppers will be called immediately but sequentially. This means that
|
|
// the second stopper won't be called until the first one has returned. This
|
|
// usually isn't a problem because the service `Stop`'s methods either don't
|
|
// take a long time to return, or they run asynchronously in another goroutine.
|
|
//
|
|
// As said, this will affect ALL the tasks in the group. It isn't possible to
|
|
// try to stop just one task.
|
|
// If a task has finished, the corresponding stopper won't be called
|
|
//
|
|
// The interrupt timeout for the group will start after all the runners in the
|
|
// group have been notified. Note that, if the task's stopper for a runner
|
|
// takes a lot of time to return, it will delay the timeout's start, so it's
|
|
// advised that the stopper either returns fast or is run asynchronously.
|
|
func (gr *GroupRunner) Interrupt() {
|
|
if gr.interrupted.CompareAndSwap(false, true) {
|
|
gr.runners.Range(func(_, value any) bool {
|
|
r := value.(*Runner)
|
|
select {
|
|
case <-r.Finished():
|
|
// No data should be sent through the channel, so we'd be
|
|
// here only if the channel is closed. This means the task
|
|
// has finished and we don't need to interrupt. We do
|
|
// nothing in this case
|
|
default:
|
|
r.Interrupt()
|
|
}
|
|
return true
|
|
})
|
|
|
|
_ = time.AfterFunc(gr.interruptDur, func() {
|
|
// timeout reached -> send it through the channel so our runner
|
|
// can abort
|
|
gr.interruptedCh <- gr.interruptDur
|
|
close(gr.interruptedCh)
|
|
})
|
|
}
|
|
}
|