Files
kopia/internal/parallelwork/parallel_work_queue.go
Julio Lopez bb20d9e11a chore(ci): enable wsl_v5:{assign,expr} linter settings (#4982)
Enable wsl_v5 settings:
- assign
- expr
2025-11-12 23:12:06 -08:00

175 lines
3.6 KiB
Go

// Package parallelwork implements parallel work queue with fixed number of workers that concurrently process and add work items to the queue.
package parallelwork
import (
"container/list"
"context"
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/clock"
)
// Queue represents a work queue with multiple parallel workers.
type Queue struct {
monitor *sync.Cond
queueItems *list.List
enqueuedWork int64
activeWorkerCount int64
completedWork int64
nextReportTime time.Time
ProgressCallback func(ctx context.Context, enqueued, active, completed int64)
}
// CallbackFunc is a callback function.
type CallbackFunc func() error
// EnqueueFront adds the work to the front of the queue.
func (v *Queue) EnqueueFront(ctx context.Context, callback CallbackFunc) {
v.enqueue(ctx, true, callback)
}
// EnqueueBack adds the work to the back of the queue.
func (v *Queue) EnqueueBack(ctx context.Context, callback CallbackFunc) {
v.enqueue(ctx, false, callback)
}
func (v *Queue) enqueue(ctx context.Context, front bool, callback CallbackFunc) {
v.monitor.L.Lock()
defer v.monitor.L.Unlock()
v.enqueuedWork++
// add to the queue and signal one reader
if front {
v.queueItems.PushFront(callback)
} else {
v.queueItems.PushBack(callback)
}
v.maybeReportProgress(ctx)
v.monitor.Signal()
}
// Process starts N workers, which will be processing elements in the queue until the queue
// is empty and all workers are idle or until any of the workers returns an error.
func (v *Queue) Process(ctx context.Context, workers int) error {
defer v.reportProgress(ctx)
eg, ctx := errgroup.WithContext(ctx)
for range workers {
eg.Go(func() error {
for {
select {
case <-ctx.Done():
// context canceled - some other worker returned an error.
return ctx.Err()
default:
callback := v.dequeue(ctx)
if callback == nil {
// no more work, shut down.
return nil
}
err := callback()
v.completed(ctx)
if err != nil {
return err
}
}
}
})
}
//nolint:wrapcheck
return eg.Wait()
}
func (v *Queue) dequeue(ctx context.Context) CallbackFunc {
v.monitor.L.Lock()
defer v.monitor.L.Unlock()
for v.queueItems.Len() == 0 && v.activeWorkerCount > 0 {
// no items in queue, but some workers are active, they may add more.
v.monitor.Wait()
}
// no items in queue, no workers are active, no more work.
if v.queueItems.Len() == 0 {
return nil
}
v.activeWorkerCount++
v.maybeReportProgress(ctx)
front := v.queueItems.Front()
v.queueItems.Remove(front)
return front.Value.(CallbackFunc) //nolint:forcetypeassert
}
func (v *Queue) completed(ctx context.Context) {
v.monitor.L.Lock()
defer v.monitor.L.Unlock()
v.activeWorkerCount--
v.completedWork++
v.maybeReportProgress(ctx)
v.monitor.Broadcast()
}
func (v *Queue) reportProgress(ctx context.Context) {
cb := v.ProgressCallback
if cb != nil {
cb(ctx, v.enqueuedWork, v.activeWorkerCount, v.completedWork)
}
}
func (v *Queue) maybeReportProgress(ctx context.Context) {
if clock.Now().Before(v.nextReportTime) {
return
}
v.nextReportTime = clock.Now().Add(1 * time.Second)
v.reportProgress(ctx)
}
// OnNthCompletion invokes the provided callback once the returned callback function has been invoked exactly n times.
func OnNthCompletion(n int, callback CallbackFunc) CallbackFunc {
var mu sync.Mutex
return func() error {
mu.Lock()
n--
call := n == 0
mu.Unlock()
if call {
return callback()
}
return nil
}
}
// NewQueue returns new parallel work queue.
func NewQueue() *Queue {
return &Queue{
queueItems: list.New(),
monitor: sync.NewCond(&sync.Mutex{}),
}
}