mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-04 12:10:21 -05:00
Bumps [github.com/open-policy-agent/opa](https://github.com/open-policy-agent/opa) from 1.6.0 to 1.8.0. - [Release notes](https://github.com/open-policy-agent/opa/releases) - [Changelog](https://github.com/open-policy-agent/opa/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-policy-agent/opa/compare/v1.6.0...v1.8.0) --- updated-dependencies: - dependency-name: github.com/open-policy-agent/opa dependency-version: 1.8.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com>
61 lines
1.6 KiB
Go
61 lines
1.6 KiB
Go
package httprc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
)
|
|
|
|
type worker struct {
|
|
httpcl HTTPClient
|
|
incoming chan any
|
|
next <-chan Resource
|
|
nextsync <-chan synchronousRequest
|
|
errSink ErrorSink
|
|
traceSink TraceSink
|
|
}
|
|
|
|
func (w worker) Run(ctx context.Context, readywg *sync.WaitGroup, donewg *sync.WaitGroup) {
|
|
w.traceSink.Put(ctx, "httprc worker: START worker loop")
|
|
defer w.traceSink.Put(ctx, "httprc worker: END worker loop")
|
|
defer donewg.Done()
|
|
ctx = withTraceSink(ctx, w.traceSink)
|
|
ctx = withHTTPClient(ctx, w.httpcl)
|
|
|
|
readywg.Done()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
w.traceSink.Put(ctx, "httprc worker: stopping worker loop")
|
|
return
|
|
case r := <-w.next:
|
|
w.traceSink.Put(ctx, fmt.Sprintf("httprc worker: syncing %q (async)", r.URL()))
|
|
if err := r.Sync(ctx); err != nil {
|
|
w.errSink.Put(ctx, err)
|
|
}
|
|
r.SetBusy(false)
|
|
|
|
w.sendAdjustIntervalRequest(ctx, r)
|
|
case sr := <-w.nextsync:
|
|
w.traceSink.Put(ctx, fmt.Sprintf("httprc worker: syncing %q (synchronous)", sr.resource.URL()))
|
|
if err := sr.resource.Sync(ctx); err != nil {
|
|
sendReply(ctx, sr.reply, struct{}{}, err)
|
|
sr.resource.SetBusy(false)
|
|
return
|
|
}
|
|
sr.resource.SetBusy(false)
|
|
sendReply(ctx, sr.reply, struct{}{}, nil)
|
|
w.sendAdjustIntervalRequest(ctx, sr.resource)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w worker) sendAdjustIntervalRequest(ctx context.Context, r Resource) {
|
|
w.traceSink.Put(ctx, "httprc worker: Sending interval adjustment request for "+r.URL())
|
|
select {
|
|
case <-ctx.Done():
|
|
case w.incoming <- adjustIntervalRequest{resource: r}:
|
|
}
|
|
w.traceSink.Put(ctx, "httprc worker: Sent interval adjustment request for "+r.URL())
|
|
}
|