Files
opencloud/pkg/runner/runner.go
2025-09-12 12:18:47 +02:00

202 lines
6.9 KiB
Go

package runner
import (
"context"
"sync/atomic"
"time"
)
// Runner represents the one executing a long running task, such as a server
// or a service.
// The ID of the runner is public to make identification easier, and the
// Result that it will generated will contain the same ID, so we can
// know which runner provided which result.
//
// Runners are intended to be used only once. Reusing them isn't possible.
// You'd need to create a new runner if you want to rerun the same task.
type Runner struct {
ID string
interruptDur time.Duration
fn Runable
interrupt Stopper
running atomic.Bool
interrupted atomic.Bool
interruptedCh chan time.Duration
finished chan struct{}
}
// New will create a new runner.
// The runner will be created with the provided id (the id must be unique,
// otherwise undefined behavior might occur), and will run the provided
// runable task, using the "interrupt" function to stop that task if needed.
//
// The interrupt duration, which can be set through the `WithInterruptDuration`
// option, will be used to ensure the runner doesn't block forever. If the
// option isn't supplied, the default value `DefaultInterruptDuration` will be used.
// The interrupt duration will be used to start a timeout when the
// runner gets interrupted (either the context of the `Run` method is done
// or this runner's `Interrupt` method is called). If the timeout is reached,
// a timeout result will be returned instead of whatever result the task should
// be returning.
//
// Note that it's your responsibility to provide a proper stopper for the task.
// The runner will just call that method assuming it will be enough to
// eventually stop the task at some point.
func New(id string, fn Runable, interrupt Stopper, opts ...Option) *Runner {
options := Options{
InterruptDuration: DefaultInterruptDuration,
}
for _, o := range opts {
o(&options)
}
return &Runner{
ID: id,
interruptDur: options.InterruptDuration,
fn: fn,
interrupt: interrupt,
interruptedCh: make(chan time.Duration, 1),
finished: make(chan struct{}),
}
}
// Run will execute the task associated to this runner in a synchronous way.
// The task will be spawned in a new goroutine, and the current thread will
// wait until the task finishes.
//
// The task will finish "naturally". The stopper will be called in the
// following ways:
// - Manually calling this runner's `Interrupt` method
// - When the provided context is done
// As said, it's expected that calling the provided stopper will be enough to
// make the task to eventually complete.
//
// Once the task finishes, the result will be returned.
// When the context is done, or if the runner is interrupted, a timeout will
// start using the provided "interrupt duration". If this timeout is reached,
// a timeout result will be returned instead of the one from the task. This is
// intended to prevent blocking the main thread indefinitely. A suitable
// duration should be used depending on the task, usually 5, 10 or 30 secs
//
// Some nice things you can do:
// - Use signal.NotifyContext(...) to call the stopper and provide a clean
// shutdown procedure when an OS signal is received
// - Use context.WithDeadline(...) or context.WithTimeout(...) to run the task
// for a limited time
func (r *Runner) Run(ctx context.Context) *Result {
if !r.running.CompareAndSwap(false, true) {
// If not swapped, the task is already running.
// Running the same task multiple times is a bug, so we panic
panic("Runner with id " + r.ID + " was running twice")
}
ch := make(chan *Result)
go r.doTask(ch, true)
select {
case result := <-ch:
return result
case <-ctx.Done():
r.Interrupt()
return <-ch
}
}
// RunAsync will execute the task associated to this runner asynchronously.
// The task will be spawned in a new goroutine and this method will finish.
// The task's result will be written in the provided channel when it's
// available, so you can wait for it if needed. It's up to you to decide
// to use a blocking or non-blocking channel, but the task will always finish
// before writing in the channel.
//
// To interrupt the running task, the only option is to call the `Interrupt`
// method at some point.
func (r *Runner) RunAsync(ch chan<- *Result) {
if !r.running.CompareAndSwap(false, true) {
// If not swapped, the task is already running.
// Running the same task multiple times is a bug, so we panic
panic("Runner with id " + r.ID + " was running twice")
}
go r.doTask(ch, false)
}
// Interrupt will execute the stopper function, which should notify the task
// in order for it to finish.
// The stopper will be called immediately, although it's expected the
// consequences to take a while (task might need a while to stop)
// A timeout will start using the provided "interrupt duration". Once that
// timeout is reached, the task must provide a result with a timeout error.
// Note that, even after returning the timeout result, the task could still
// be being executed and consuming resource.
// This method will be called only once. Further calls won't do anything
func (r *Runner) Interrupt() {
if r.interrupted.CompareAndSwap(false, true) {
go func() {
select {
case <-r.Finished():
// Task finished -> runner should be delivering the result
case <-time.After(r.interruptDur):
// timeout reached -> send it through the channel so our runner
// can abort
r.interruptedCh <- r.interruptDur
close(r.interruptedCh)
}
}()
r.interrupt()
}
}
// Finished will return a receive-only channel that can be used to know when
// the task has finished but the result hasn't been made available yet. The
// channel will be closed (without sending any message) when the task has finished.
// This can be used specially with the `RunAsync` method when multiple runners
// use the same channel: results could be waiting on your side of the channel
func (r *Runner) Finished() <-chan struct{} {
return r.finished
}
// doTask will perform this runner's task and write the result in the provided
// channel. The channel will be closed if requested.
// A result will be provided when either the task finishes naturally or we
// reach the timeout after being interrupted
func (r *Runner) doTask(ch chan<- *Result, closeChan bool) {
tmpCh := make(chan *Result, 1)
// spawn the task and return the result in a temporary channel
go func(tmpCh chan *Result) {
err := r.fn()
close(r.finished)
result := &Result{
RunnerID: r.ID,
RunnerError: err,
}
tmpCh <- result
close(tmpCh)
}(tmpCh)
// wait for the result in the temporary channel or until we get the
// interrupted signal
var result *Result
select {
case d := <-r.interruptedCh:
result = &Result{
RunnerID: r.ID,
RunnerError: NewTimeoutError(r.ID, d),
}
case result = <-tmpCh:
// Just assign the received value, nothing else to do
}
// send the result
ch <- result
if closeChan {
close(ch)
}
}