Files

108 lines
2.4 KiB
Go

package testcontainers
import (
"context"
"fmt"
"sync"
)
const (
defaultWorkersCount = 8
)
type ParallelContainerRequest []GenericContainerRequest
// ParallelContainersOptions represents additional options for parallel running
type ParallelContainersOptions struct {
WorkersCount int // count of parallel workers. If field empty(zero), default value will be 'defaultWorkersCount'
}
// ParallelContainersRequestError represents error from parallel request
type ParallelContainersRequestError struct {
Request GenericContainerRequest
Error error
}
type ParallelContainersError struct {
Errors []ParallelContainersRequestError
}
func (gpe ParallelContainersError) Error() string {
return fmt.Sprintf("%v", gpe.Errors)
}
// parallelContainersResult represents result.
type parallelContainersResult struct {
ParallelContainersRequestError
Container Container
}
func parallelContainersRunner(
ctx context.Context,
requests <-chan GenericContainerRequest,
results chan<- parallelContainersResult,
wg *sync.WaitGroup,
) {
defer wg.Done()
for req := range requests {
c, err := GenericContainer(ctx, req)
res := parallelContainersResult{Container: c}
if err != nil {
res.Request = req
res.Error = err
}
results <- res
}
}
// ParallelContainers creates a generic containers with parameters and run it in parallel mode
func ParallelContainers(ctx context.Context, reqs ParallelContainerRequest, opt ParallelContainersOptions) ([]Container, error) {
if opt.WorkersCount == 0 {
opt.WorkersCount = defaultWorkersCount
}
tasksChanSize := min(opt.WorkersCount, len(reqs))
tasksChan := make(chan GenericContainerRequest, tasksChanSize)
resultsChan := make(chan parallelContainersResult, tasksChanSize)
done := make(chan struct{})
var wg sync.WaitGroup
wg.Add(tasksChanSize)
// run workers
for range tasksChanSize {
go parallelContainersRunner(ctx, tasksChan, resultsChan, &wg)
}
var errs []ParallelContainersRequestError
containers := make([]Container, 0, len(reqs))
go func() {
defer close(done)
for res := range resultsChan {
if res.Error != nil {
errs = append(errs, res.ParallelContainersRequestError)
} else {
containers = append(containers, res.Container)
}
}
}()
for _, req := range reqs {
tasksChan <- req
}
close(tasksChan)
wg.Wait()
close(resultsChan)
<-done
if len(errs) != 0 {
return containers, ParallelContainersError{Errors: errs}
}
return containers, nil
}