mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2025-12-23 22:29:59 -05:00
294 lines
8.2 KiB
Go
294 lines
8.2 KiB
Go
package runner_test
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
"github.com/opencloud-eu/opencloud/pkg/runner"
|
|
)
|
|
|
|
var _ = Describe("GroupRunner", func() {
|
|
var (
|
|
gr *runner.GroupRunner
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
gr = runner.NewGroup()
|
|
|
|
task1Ch := make(chan error)
|
|
task1 := TimedTask(task1Ch, 30*time.Second)
|
|
gr.Add(runner.New("task1", task1, func() {
|
|
task1Ch <- nil
|
|
close(task1Ch)
|
|
}))
|
|
|
|
task2Ch := make(chan error)
|
|
task2 := TimedTask(task2Ch, 20*time.Second)
|
|
gr.Add(runner.New("task2", task2, func() {
|
|
task2Ch <- nil
|
|
close(task2Ch)
|
|
}))
|
|
})
|
|
|
|
Describe("Add", func() {
|
|
It("Duplicated runner id panics", func() {
|
|
Expect(func() {
|
|
gr.Add(runner.New("task1", func() error {
|
|
time.Sleep(6 * time.Second)
|
|
return nil
|
|
}, func() {
|
|
}))
|
|
}).To(Panic())
|
|
})
|
|
|
|
It("Add after run panics", func(ctx SpecContext) {
|
|
// context will be done in 1 second
|
|
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
|
|
defer cancel()
|
|
|
|
// spawn a new goroutine and return the result in the channel
|
|
ch2 := make(chan []*runner.Result)
|
|
go func(ch2 chan []*runner.Result) {
|
|
ch2 <- gr.Run(myCtx)
|
|
close(ch2)
|
|
}(ch2)
|
|
|
|
// context is done in 1 sec, so all task should be interrupted and finish
|
|
Eventually(ctx, ch2).Should(Receive(ContainElements(
|
|
&runner.Result{RunnerID: "task1", RunnerError: nil},
|
|
&runner.Result{RunnerID: "task2", RunnerError: nil},
|
|
)))
|
|
|
|
task3Ch := make(chan error)
|
|
task3 := TimedTask(task3Ch, 6*time.Second)
|
|
Expect(func() {
|
|
gr.Add(runner.New("task3", task3, func() {
|
|
task3Ch <- nil
|
|
close(task3Ch)
|
|
}))
|
|
}).To(Panic())
|
|
}, SpecTimeout(5*time.Second))
|
|
|
|
It("Add after runAsync panics", func(ctx SpecContext) {
|
|
ch2 := make(chan *runner.Result)
|
|
gr.RunAsync(ch2)
|
|
|
|
Expect(func() {
|
|
task3Ch := make(chan error)
|
|
task3 := TimedTask(task3Ch, 6*time.Second)
|
|
gr.Add(runner.New("task3", task3, func() {
|
|
task3Ch <- nil
|
|
close(task3Ch)
|
|
}))
|
|
}).To(Panic())
|
|
}, SpecTimeout(5*time.Second))
|
|
})
|
|
|
|
Describe("Run", func() {
|
|
It("Context is done", func(ctx SpecContext) {
|
|
task3Ch := make(chan error)
|
|
task3 := TimedTask(task3Ch, 6*time.Second)
|
|
gr.Add(runner.New("task3", task3, func() {
|
|
task3Ch <- nil
|
|
close(task3Ch)
|
|
}))
|
|
|
|
// context will be done in 1 second
|
|
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
|
|
defer cancel()
|
|
|
|
// spawn a new goroutine and return the result in the channel
|
|
ch2 := make(chan []*runner.Result)
|
|
go func(ch2 chan []*runner.Result) {
|
|
ch2 <- gr.Run(myCtx)
|
|
close(ch2)
|
|
}(ch2)
|
|
|
|
// context is done in 1 sec, so all task should be interrupted and finish
|
|
Eventually(ctx, ch2).Should(Receive(ContainElements(
|
|
&runner.Result{RunnerID: "task1", RunnerError: nil},
|
|
&runner.Result{RunnerID: "task2", RunnerError: nil},
|
|
&runner.Result{RunnerID: "task3", RunnerError: nil},
|
|
)))
|
|
}, SpecTimeout(5*time.Second))
|
|
|
|
It("One task finishes early", func(ctx SpecContext) {
|
|
task3Ch := make(chan error)
|
|
task3 := TimedTask(task3Ch, 1*time.Second)
|
|
gr.Add(runner.New("task3", task3, func() {
|
|
task3Ch <- nil
|
|
close(task3Ch)
|
|
}))
|
|
|
|
// context will be done in 10 second
|
|
myCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
// spawn a new goroutine and return the result in the channel
|
|
ch2 := make(chan []*runner.Result)
|
|
go func(ch2 chan []*runner.Result) {
|
|
ch2 <- gr.Run(myCtx)
|
|
close(ch2)
|
|
}(ch2)
|
|
|
|
// task3 finishes in 1 sec, so the rest should also be interrupted
|
|
Eventually(ctx, ch2).Should(Receive(ContainElements(
|
|
&runner.Result{RunnerID: "task1", RunnerError: nil},
|
|
&runner.Result{RunnerID: "task2", RunnerError: nil},
|
|
&runner.Result{RunnerID: "task3", RunnerError: nil},
|
|
)))
|
|
}, SpecTimeout(5*time.Second))
|
|
|
|
It("Context done and group timeout reached", func(ctx SpecContext) {
|
|
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))
|
|
|
|
gr.Add(runner.New("task1", func() error {
|
|
time.Sleep(6 * time.Second)
|
|
return nil
|
|
}, func() {
|
|
}))
|
|
|
|
gr.Add(runner.New("task2", func() error {
|
|
time.Sleep(6 * time.Second)
|
|
return nil
|
|
}, func() {
|
|
}))
|
|
|
|
// context will be done in 1 second
|
|
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
|
|
defer cancel()
|
|
|
|
// spawn a new goroutine and return the result in the channel
|
|
ch2 := make(chan []*runner.Result)
|
|
go func(ch2 chan []*runner.Result) {
|
|
ch2 <- gr.Run(myCtx)
|
|
close(ch2)
|
|
}(ch2)
|
|
|
|
// context finishes in 1 sec, tasks will be interrupted
|
|
// group timeout will be reached after 2 extra seconds
|
|
Eventually(ctx, ch2).Should(Receive(ContainElements(
|
|
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
|
|
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
|
|
)))
|
|
}, SpecTimeout(5*time.Second))
|
|
|
|
It("Interrupted and group timeout reached", func(ctx SpecContext) {
|
|
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))
|
|
|
|
gr.Add(runner.New("task1", func() error {
|
|
time.Sleep(6 * time.Second)
|
|
return nil
|
|
}, func() {
|
|
}))
|
|
|
|
gr.Add(runner.New("task2", func() error {
|
|
time.Sleep(6 * time.Second)
|
|
return nil
|
|
}, func() {
|
|
}))
|
|
|
|
// context will be done in 10 second
|
|
myCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
// spawn a new goroutine and return the result in the channel
|
|
ch2 := make(chan []*runner.Result)
|
|
go func(ch2 chan []*runner.Result) {
|
|
ch2 <- gr.Run(myCtx)
|
|
close(ch2)
|
|
}(ch2)
|
|
gr.Interrupt()
|
|
|
|
// tasks will be interrupted
|
|
// group timeout will be reached after 2 extra seconds
|
|
Eventually(ctx, ch2).Should(Receive(ContainElements(
|
|
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
|
|
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
|
|
)))
|
|
}, SpecTimeout(5*time.Second))
|
|
|
|
It("Doble run panics", func(ctx SpecContext) {
|
|
// context will be done in 1 second
|
|
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
|
|
defer cancel()
|
|
|
|
Expect(func() {
|
|
gr.Run(myCtx)
|
|
gr.Run(myCtx)
|
|
}).To(Panic())
|
|
}, SpecTimeout(5*time.Second))
|
|
})
|
|
|
|
Describe("RunAsync", func() {
|
|
It("Wait in channel", func(ctx SpecContext) {
|
|
task3Ch := make(chan error)
|
|
task3 := TimedTask(task3Ch, 1*time.Second)
|
|
gr.Add(runner.New("task3", task3, func() {
|
|
task3Ch <- nil
|
|
close(task3Ch)
|
|
}))
|
|
|
|
ch2 := make(chan *runner.Result)
|
|
gr.RunAsync(ch2)
|
|
|
|
// task3 finishes in 1 sec, so the rest should also be interrupted
|
|
Eventually(ctx, ch2).Should(Receive())
|
|
Eventually(ctx, ch2).Should(Receive())
|
|
Eventually(ctx, ch2).Should(Receive())
|
|
}, SpecTimeout(5*time.Second))
|
|
|
|
It("Double runAsync panics", func(ctx SpecContext) {
|
|
ch2 := make(chan *runner.Result)
|
|
Expect(func() {
|
|
gr.RunAsync(ch2)
|
|
gr.RunAsync(ch2)
|
|
}).To(Panic())
|
|
}, SpecTimeout(5*time.Second))
|
|
|
|
It("Interrupt async", func(ctx SpecContext) {
|
|
task3Ch := make(chan error)
|
|
task3 := TimedTask(task3Ch, 6*time.Second)
|
|
gr.Add(runner.New("task3", task3, func() {
|
|
task3Ch <- nil
|
|
close(task3Ch)
|
|
}))
|
|
|
|
ch2 := make(chan *runner.Result)
|
|
gr.RunAsync(ch2)
|
|
gr.Interrupt()
|
|
|
|
// tasks will be interrupted
|
|
Eventually(ctx, ch2).Should(Receive())
|
|
Eventually(ctx, ch2).Should(Receive())
|
|
Eventually(ctx, ch2).Should(Receive())
|
|
}, SpecTimeout(5*time.Second))
|
|
|
|
It("Interrupt async group timeout reached", func(ctx SpecContext) {
|
|
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))
|
|
|
|
gr.Add(runner.New("task1", func() error {
|
|
time.Sleep(6 * time.Second)
|
|
return nil
|
|
}, func() {
|
|
}))
|
|
|
|
gr.Add(runner.New("task2", func() error {
|
|
time.Sleep(6 * time.Second)
|
|
return nil
|
|
}, func() {
|
|
}))
|
|
|
|
ch2 := make(chan *runner.Result)
|
|
gr.RunAsync(ch2)
|
|
gr.Interrupt()
|
|
|
|
// group timeout will be reached after 2 extra seconds
|
|
Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)})))
|
|
Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)})))
|
|
}, SpecTimeout(5*time.Second))
|
|
})
|
|
})
|