refactor(GODT-2500): Reorganise async methods.

This commit is contained in:
Jakub
2023-03-30 17:54:23 +02:00
committed by cuthix
parent 1d5a7231e2
commit 3cc8423ece
18 changed files with 105 additions and 91 deletions

View File

@@ -4,7 +4,7 @@ import (
"bytes"
"context"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/bradenaw/juniper/parallel"
)
@@ -59,10 +59,10 @@ func (SequentialScheduler) Schedule(ctx context.Context, attachmentIDs []string,
type ParallelScheduler struct {
workers int
panicHandler queue.PanicHandler
panicHandler async.PanicHandler
}
func NewParallelScheduler(workers int, panicHandler queue.PanicHandler) *ParallelScheduler {
func NewParallelScheduler(workers int, panicHandler async.PanicHandler) *ParallelScheduler {
if workers == 0 {
workers = 1
}
@@ -70,12 +70,6 @@ func NewParallelScheduler(workers int, panicHandler queue.PanicHandler) *Paralle
return &ParallelScheduler{workers: workers}
}
func (p *ParallelScheduler) handlePanic() {
if p.panicHandler != nil {
p.panicHandler.HandlePanic()
}
}
func (p ParallelScheduler) Schedule(ctx context.Context, attachmentIDs []string, storageProvider AttachmentAllocator, downloader func(context.Context, string, *bytes.Buffer) error) ([]*bytes.Buffer, error) {
// If we have less attachments than the maximum works, reduce worker count to match attachment count.
workers := p.workers
@@ -84,7 +78,7 @@ func (p ParallelScheduler) Schedule(ctx context.Context, attachmentIDs []string,
}
return parallel.MapContext(ctx, workers, attachmentIDs, func(ctx context.Context, id string) (*bytes.Buffer, error) {
defer p.handlePanic()
defer async.HandlePanic(p.panicHandler)
buffer := storageProvider.NewBuffer()
if err := downloader(ctx, id, buffer); err != nil {

View File

@@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/go-resty/resty/v2"
)
@@ -60,7 +61,7 @@ func (c *Client) NewEventStream(ctx context.Context, period, jitter time.Duratio
eventCh := make(chan Event)
go func() {
defer c.m.handlePanic()
defer async.HandlePanic(c.m.panicHandler)
defer close(eventCh)

View File

@@ -1,10 +1,12 @@
package proton
import "github.com/ProtonMail/gluon/queue"
import (
"github.com/ProtonMail/gluon/async"
)
type Future[T any] struct {
resCh chan res[T]
panicHandler queue.PanicHandler
panicHandler async.PanicHandler
}
type res[T any] struct {
@@ -12,7 +14,7 @@ type res[T any] struct {
err error
}
func NewFuture[T any](panicHandler queue.PanicHandler, fn func() (T, error)) *Future[T] {
func NewFuture[T any](panicHandler async.PanicHandler, fn func() (T, error)) *Future[T] {
resCh := make(chan res[T])
job := &Future[T]{
resCh: resCh,
@@ -20,7 +22,7 @@ func NewFuture[T any](panicHandler queue.PanicHandler, fn func() (T, error)) *Fu
}
go func() {
defer job.handlePanic()
defer async.HandlePanic(job.panicHandler)
val, err := fn()
@@ -32,7 +34,7 @@ func NewFuture[T any](panicHandler queue.PanicHandler, fn func() (T, error)) *Fu
func (job *Future[T]) Then(fn func(T, error)) {
go func() {
defer job.handlePanic()
defer async.HandlePanic(job.panicHandler)
res := <-job.resCh
@@ -40,12 +42,6 @@ func (job *Future[T]) Then(fn func(T, error)) {
}()
}
func (job *Future[T]) handlePanic() {
if job.panicHandler != nil {
job.panicHandler.HandlePanic()
}
}
func (job *Future[T]) Get() (T, error) {
res := <-job.resCh
@@ -54,10 +50,10 @@ func (job *Future[T]) Get() (T, error) {
type Group[T any] struct {
futures []*Future[T]
panicHandler queue.PanicHandler
panicHandler async.PanicHandler
}
func NewGroup[T any](panicHandler queue.PanicHandler) *Group[T] {
func NewGroup[T any](panicHandler async.PanicHandler) *Group[T] {
return &Group[T]{panicHandler: panicHandler}
}

View File

@@ -5,14 +5,14 @@ import (
"testing"
"time"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/stretchr/testify/require"
)
func TestFuture(t *testing.T) {
resCh := make(chan int)
NewFuture(queue.NoopPanicHandler{}, func() (int, error) {
NewFuture(async.NoopPanicHandler{}, func() (int, error) {
return 42, nil
}).Then(func(res int, err error) {
resCh <- res
@@ -22,7 +22,7 @@ func TestFuture(t *testing.T) {
}
func TestGroup(t *testing.T) {
group := NewGroup[int](queue.NoopPanicHandler{})
group := NewGroup[int](async.NoopPanicHandler{})
for i := 0; i < 10; i++ {
i := i

17
go.mod
View File

@@ -4,7 +4,7 @@ go 1.18
require (
github.com/Masterminds/semver/v3 v3.2.0
github.com/ProtonMail/gluon v0.14.1
github.com/ProtonMail/gluon v0.15.1-0.20230331095629-e23a7a1be2a8
github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8
github.com/ProtonMail/go-srp v0.0.5
github.com/ProtonMail/gopenpgp/v2 v2.5.2
@@ -27,41 +27,52 @@ require (
)
require (
ariga.io/atlas v0.9.1-0.20230119145809-92243f7c55cb // indirect
entgo.io/ent v0.11.8 // indirect
github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf // indirect
github.com/ProtonMail/go-mime v0.0.0-20221031134845-8fd9bc37cf08 // indirect
github.com/agext/levenshtein v1.2.3 // indirect
github.com/andybalholm/cascadia v1.3.1 // indirect
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
github.com/bytedance/sonic v1.8.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/cloudflare/circl v1.3.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/cronokirby/saferith v0.33.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emersion/go-imap v1.2.1 // indirect
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-openapi/inflect v0.19.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.11.2 // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/hashicorp/hcl/v2 v2.16.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-sqlite3 v1.14.16 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.10 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/zclconf/go-cty v1.12.1 // indirect
golang.org/x/arch v0.2.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20230221151758-ace64dc21148 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace github.com/ProtonMail/gluon => /home/dev/gopath18/src/gluon

29
go.sum
View File

@@ -1,3 +1,7 @@
ariga.io/atlas v0.9.1-0.20230119145809-92243f7c55cb h1:mbsFtavDqGdYwdDpP50LGOOZ2hgyGoJcZeOpbgKMyu4=
ariga.io/atlas v0.9.1-0.20230119145809-92243f7c55cb/go.mod h1:T230JFcENj4ZZzMkZrXFDSkv+2kXkUgpJ5FQQ5hMcKU=
entgo.io/ent v0.11.8 h1:M/M0QL1CYCUSdqGRXUrXhFYSDRJPsOOrr+RLEej/gyQ=
entgo.io/ent v0.11.8/go.mod h1:ericBi6Q8l3wBH1wEIDfKxw7rcQEuRPyBfbIzjtxJ18=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g=
github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
@@ -6,6 +10,8 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs
github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo=
github.com/ProtonMail/gluon v0.14.1 h1:7iXLasYoBGaOW88k4p29w7bPAOpY6aR6GErdVRu8MZs=
github.com/ProtonMail/gluon v0.14.1/go.mod h1:z2AxLIiBCT1K+0OBHyaDI7AEaO5qI6/BEC2TE42vs4Q=
github.com/ProtonMail/gluon v0.15.1-0.20230331095629-e23a7a1be2a8 h1:USMR8imbxkP4Ailch4ceV3hCZTaANMIGHhb5rpZFYn4=
github.com/ProtonMail/gluon v0.15.1-0.20230331095629-e23a7a1be2a8/go.mod h1:yA4hk6CJw0BMo+YL8Y3ckCYs5L20sysu9xseshwY3QI=
github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo=
github.com/ProtonMail/go-crypto v0.0.0-20230124153114-0acdc8ae009b/go.mod h1:I0gYDMZ6Z5GRU7l58bNFSkPTFN6Yl12dsUlAZ8xy98g=
github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8 h1:wPbRQzjjwFc0ih8puEVAOFGELsn1zoIIYdxvML7mDxA=
@@ -18,8 +24,12 @@ github.com/ProtonMail/gopenpgp/v2 v2.5.2 h1:97SjlWNAxXl9P22lgwgrZRshQdiEfAht0g3Z
github.com/ProtonMail/gopenpgp/v2 v2.5.2/go.mod h1:52qDaCnto6r+CoWbuU50T77XQt99lIs46HtHtvgFO3o=
github.com/PuerkitoBio/goquery v1.8.1 h1:uQxhNlArOIdbrH1tr0UXwdVFgDcZDrZVdcpygAcwmWM=
github.com/PuerkitoBio/goquery v1.8.1/go.mod h1:Q8ICL1kNUJ2sXGoAhPGUdYDJvgQgHzJsnnd3H7Ho5jQ=
github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo=
github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw=
github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo=
github.com/bradenaw/juniper v0.10.2 h1:EY7r8SJJrigJ7lvWk6ews3K5RD4XTG9z+WSwHJKijP4=
github.com/bradenaw/juniper v0.10.2/go.mod h1:Z2B7aJlQ7xbfWsnMLROj5t/5FQ94/MkIdKC30J4WvzI=
github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
@@ -39,8 +49,12 @@ github.com/cronokirby/saferith v0.33.0/go.mod h1:QKJhjoqUtBsXCAVEjw38mFqoi7DebT7
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emersion/go-imap v1.2.1 h1:+s9ZjMEjOB8NzZMVTM3cCenz2JrQIGGo5j1df19WjTA=
github.com/emersion/go-imap v1.2.1/go.mod h1:Qlx1FSx2FTxjnjWpIlVNEuX+ylerZQNFE5NsmKFSejY=
github.com/emersion/go-message v0.15.0/go.mod h1:wQUEfE+38+7EW8p8aZ96ptg6bAb1iwdgej19uXASlE4=
github.com/emersion/go-message v0.16.0 h1:uZLz8ClLv3V5fSFF/fFdW9jXjrZkXIpE1Fn8fKx7pO4=
github.com/emersion/go-message v0.16.0/go.mod h1:pDJDgf/xeUIF+eicT6B/hPX/ZbEorKkUMPOxrPVG2eQ=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 h1:IbFBtwoTQyw0fIM5xv1HF+Y+3ZijDR839WMulgxCcUY=
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594/go.mod h1:aqO8z8wPrjkscevZJFVE1wXJrLpC5LtJG7fqLOsPb2U=
github.com/emersion/go-vcard v0.0.0-20220507122617-d4056df0ec4a h1:cltZpe6s0SJtqK5c/5y2VrIYi8BAtDM6qjmiGYqfTik=
@@ -49,6 +63,8 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8=
github.com/gin-gonic/gin v1.9.0/go.mod h1:W1Me9+hsUSyj3CePGrd1/QrKJMSJ1Tu/0hFEH89961k=
github.com/go-openapi/inflect v0.19.0 h1:9jCH9scKIbHeV9m12SmPilScz6krDxKRasNNSNPXu/4=
github.com/go-openapi/inflect v0.19.0/go.mod h1:lHpZVlpIQqLyKwJ4N+YSc9hchQy/i12fJykb83CRBH4=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
@@ -65,9 +81,12 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl/v2 v2.16.1 h1:BwuxEMD/tsYgbhIW7UuI3crjovf3MzuFWiVgiv57iHg=
github.com/hashicorp/hcl/v2 v2.16.1/go.mod h1:JRmR89jycNkrrqnMmvPDMd56n1rQJ2Q6KocSLCMCXng=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -79,6 +98,10 @@ github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0=
github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -86,6 +109,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -114,6 +139,8 @@ github.com/urfave/cli/v2 v2.24.4/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6f
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zclconf/go-cty v1.12.1 h1:PcupnljUm9EIvbgSHQnHhUr3fO6oFmkOrvs2BAFNXXY=
github.com/zclconf/go-cty v1.12.1/go.mod h1:s9IfD1LK5ccNMSWCVFCE2rJfHiZgi7JijgeWIMfhLvA=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
@@ -134,6 +161,8 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
golang.org/x/mobile v0.0.0-20221110043201-43a038452099/go.mod h1:aAjjkJNdrh3PMckS4B10TGS2nag27cbKR1y2BpUxsiY=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=

View File

@@ -6,7 +6,7 @@ import (
"runtime"
"testing"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/go-proton-api"
"github.com/bradenaw/juniper/iterator"
"github.com/bradenaw/juniper/stream"
@@ -32,7 +32,7 @@ func createTestMessages(t *testing.T, c *proton.Client, pass string, count int)
keyPass, err := salt.SaltForKey([]byte(pass), user.Keys.Primary().ID)
require.NoError(t, err)
_, addrKRs, err := proton.Unlock(user, addr, keyPass, queue.NoopPanicHandler{})
_, addrKRs, err := proton.Unlock(user, addr, keyPass, async.NoopPanicHandler{})
require.NoError(t, err)
req := iterator.Collect(iterator.Map(iterator.Counter(count), func(i int) proton.ImportReq {

View File

@@ -6,7 +6,7 @@ import (
"net"
"sync"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/go-resty/resty/v2"
)
@@ -21,7 +21,7 @@ type Manager struct {
verifyProofs bool
panicHandler queue.PanicHandler
panicHandler async.PanicHandler
}
func New(opts ...Option) *Manager {
@@ -131,9 +131,3 @@ func (m *Manager) onConnUp() {
observer(m.status)
}
}
func (m *Manager) handlePanic() {
if m.panicHandler != nil {
m.panicHandler.HandlePanic()
}
}

View File

@@ -4,7 +4,7 @@ import (
"net/http"
"time"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/go-resty/resty/v2"
)
@@ -26,7 +26,7 @@ type managerBuilder struct {
retryCount int
logger resty.Logger
debug bool
panicHandler queue.PanicHandler
panicHandler async.PanicHandler
}
func newManagerBuilder() *managerBuilder {
@@ -39,7 +39,7 @@ func newManagerBuilder() *managerBuilder {
retryCount: 3,
logger: nil,
debug: false,
panicHandler: queue.NoopPanicHandler{},
panicHandler: async.NoopPanicHandler{},
}
}

View File

@@ -7,6 +7,7 @@ import (
"runtime"
"strconv"
"github.com/ProtonMail/gluon/async"
"github.com/bradenaw/juniper/parallel"
"github.com/bradenaw/juniper/xslices"
"github.com/go-resty/resty/v2"
@@ -87,7 +88,7 @@ func (c *Client) DeleteMessage(ctx context.Context, messageIDs ...string) error
pages := xslices.Chunk(messageIDs, maxPageSize)
return parallel.DoContext(ctx, runtime.NumCPU(), len(pages), func(ctx context.Context, idx int) error {
defer c.m.handlePanic()
defer async.HandlePanic(c.m.panicHandler)
return c.do(ctx, func(r *resty.Request) (*resty.Response, error) {
return r.SetBody(MessageActionReq{IDs: pages[idx]}).Put("/mail/v4/messages/delete")
@@ -99,7 +100,7 @@ func (c *Client) MarkMessagesRead(ctx context.Context, messageIDs ...string) err
pages := xslices.Chunk(messageIDs, maxPageSize)
return parallel.DoContext(ctx, runtime.NumCPU(), len(pages), func(ctx context.Context, idx int) error {
defer c.m.handlePanic()
defer async.HandlePanic(c.m.panicHandler)
return c.do(ctx, func(r *resty.Request) (*resty.Response, error) {
return r.SetBody(MessageActionReq{IDs: pages[idx]}).Put("/mail/v4/messages/read")
@@ -111,7 +112,7 @@ func (c *Client) MarkMessagesUnread(ctx context.Context, messageIDs ...string) e
pages := xslices.Chunk(messageIDs, maxPageSize)
return parallel.DoContext(ctx, runtime.NumCPU(), len(pages), func(ctx context.Context, idx int) error {
defer c.m.handlePanic()
defer async.HandlePanic(c.m.panicHandler)
req := MessageActionReq{IDs: pages[idx]}
@@ -131,7 +132,7 @@ func (c *Client) LabelMessages(ctx context.Context, messageIDs []string, labelID
runtime.NumCPU(),
xslices.Chunk(messageIDs, maxPageSize),
func(ctx context.Context, messageIDs []string) (LabelMessagesRes, error) {
defer c.m.handlePanic()
defer async.HandlePanic(c.m.panicHandler)
var res LabelMessagesRes
@@ -172,7 +173,7 @@ func (c *Client) UnlabelMessages(ctx context.Context, messageIDs []string, label
runtime.NumCPU(),
xslices.Chunk(messageIDs, maxPageSize),
func(ctx context.Context, messageIDs []string) (LabelMessagesRes, error) {
defer c.m.handlePanic()
defer async.HandlePanic(c.m.panicHandler)
var res LabelMessagesRes

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/bradenaw/juniper/iterator"
"github.com/bradenaw/juniper/parallel"
@@ -45,7 +46,7 @@ func (c *Client) ImportMessages(ctx context.Context, addrKR *crypto.KeyRing, wor
workers,
buffer,
func(ctx context.Context, req []ImportReq) (stream.Stream[ImportRes], error) {
defer c.m.handlePanic()
defer async.HandlePanic(c.m.panicHandler)
res, err := c.importMessages(ctx, req)
if err != nil {

View File

@@ -3,7 +3,7 @@ package proton
import (
"net/http"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/go-resty/resty/v2"
)
@@ -124,14 +124,14 @@ func (opt withDebug) config(builder *managerBuilder) {
builder.debug = opt.debug
}
func WithPanicHandler(panicHandler queue.PanicHandler) Option {
func WithPanicHandler(panicHandler async.PanicHandler) Option {
return &withPanicHandler{
panicHandler: panicHandler,
}
}
type withPanicHandler struct {
panicHandler queue.PanicHandler
panicHandler async.PanicHandler
}
func (opt withPanicHandler) config(builder *managerBuilder) {

View File

@@ -4,6 +4,7 @@ import (
"context"
"runtime"
"github.com/ProtonMail/gluon/async"
"github.com/bradenaw/juniper/iterator"
"github.com/bradenaw/juniper/parallel"
"github.com/bradenaw/juniper/stream"
@@ -22,7 +23,7 @@ func fetchPaged[T any](
runtime.NumCPU(),
runtime.NumCPU(),
func(ctx context.Context, page int) (stream.Stream[T], error) {
defer c.m.handlePanic()
defer async.HandlePanic(c.m.panicHandler)
values, err := fn(ctx, page, pageSize)
if err != nil {

20
pool.go
View File

@@ -6,7 +6,7 @@ import (
"fmt"
"sync"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
)
// ErrJobCancelled indicates the job was cancelled.
@@ -14,25 +14,25 @@ var ErrJobCancelled = errors.New("job cancelled by surrounding context")
// Pool is a worker pool that handles input of type In and returns results of type Out.
type Pool[In comparable, Out any] struct {
queue *queue.QueuedChannel[*job[In, Out]]
queue *async.QueuedChannel[*job[In, Out]]
wg sync.WaitGroup
panicHandler queue.PanicHandler
panicHandler async.PanicHandler
}
// doneFunc must be called to free up pool resources.
type doneFunc func()
// New returns a new pool.
func NewPool[In comparable, Out any](size int, panicHandler queue.PanicHandler, work func(context.Context, In) (Out, error)) *Pool[In, Out] {
func NewPool[In comparable, Out any](size int, panicHandler async.PanicHandler, work func(context.Context, In) (Out, error)) *Pool[In, Out] {
pool := &Pool[In, Out]{
queue: queue.NewQueuedChannel[*job[In, Out]](0, 0, panicHandler),
queue: async.NewQueuedChannel[*job[In, Out]](0, 0, panicHandler),
}
for i := 0; i < size; i++ {
pool.wg.Add(1)
go func() {
defer pool.handlePanic()
defer async.HandlePanic(pool.panicHandler)
defer pool.wg.Done()
@@ -58,12 +58,6 @@ func NewPool[In comparable, Out any](size int, panicHandler queue.PanicHandler,
return pool
}
func (pool *Pool[In, Out]) handlePanic() {
if pool.panicHandler != nil {
pool.panicHandler.HandlePanic()
}
}
// Process submits jobs to the pool. The callback provides access to the result, or an error if one occurred.
func (pool *Pool[In, Out]) Process(ctx context.Context, reqs []In, fn func(int, In, Out, error) error) error {
ctx, cancel := context.WithCancel(ctx)
@@ -81,7 +75,7 @@ func (pool *Pool[In, Out]) Process(ctx context.Context, reqs []In, fn func(int,
wg.Add(1)
go func(index int) {
defer pool.handlePanic()
defer async.HandlePanic(pool.panicHandler)
defer wg.Done()

View File

@@ -8,7 +8,7 @@ import (
"testing"
"time"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -154,7 +154,7 @@ func TestPool_ProcessAll(t *testing.T) {
}
func newDoubler(workers int, delay ...time.Duration) *Pool[int, int] {
return NewPool(workers, queue.NoopPanicHandler{}, func(ctx context.Context, req int) (int, error) {
return NewPool(workers, async.NoopPanicHandler{}, func(ctx context.Context, req int) (int, error) {
if len(delay) > 0 {
time.Sleep(delay[0])
}
@@ -164,7 +164,7 @@ func newDoubler(workers int, delay ...time.Duration) *Pool[int, int] {
}
func newDoublerWithError(workers int) *Pool[int, int] {
return NewPool(workers, queue.NoopPanicHandler{}, func(ctx context.Context, req int) (int, error) {
return NewPool(workers, async.NoopPanicHandler{}, func(ctx context.Context, req int) (int, error) {
if req%2 == 0 {
return 0, errors.New("oops")
}

View File

@@ -19,7 +19,7 @@ import (
"github.com/bradenaw/juniper/parallel"
"github.com/Masterminds/semver/v3"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/rfc822"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/gopenpgp/v2/crypto"
@@ -613,7 +613,7 @@ func TestServer_CreateMessage(t *testing.T) {
pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID)
require.NoError(t, err)
_, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{})
_, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{})
require.NoError(t, err)
draft, err := c.CreateDraft(ctx, addrKRs[addr[0].ID], proton.CreateDraftReq{
@@ -649,7 +649,7 @@ func TestServer_UpdateDraft(t *testing.T) {
pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID)
require.NoError(t, err)
_, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{})
_, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{})
require.NoError(t, err)
// Create the draft.
@@ -725,7 +725,7 @@ func TestServer_SendMessage(t *testing.T) {
pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID)
require.NoError(t, err)
_, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{})
_, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{})
require.NoError(t, err)
draft, err := c.CreateDraft(ctx, addrKRs[addr[0].ID], proton.CreateDraftReq{
@@ -808,7 +808,7 @@ func TestServer_Import(t *testing.T) {
pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID)
require.NoError(t, err)
_, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{})
_, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{})
require.NoError(t, err)
res := importMessages(ctx, t, c, addr[0].ID, addrKRs[addr[0].ID], []string{}, proton.MessageFlagReceived, 1)
@@ -1027,7 +1027,7 @@ func TestServer_Labels(t *testing.T) {
pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID)
require.NoError(t, err)
_, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{})
_, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{})
require.NoError(t, err)
for _, tt := range tests {
@@ -1169,7 +1169,7 @@ func TestServer_Import_FlagsAndLabels(t *testing.T) {
pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID)
require.NoError(t, err)
_, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{})
_, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{})
require.NoError(t, err)
for _, tt := range tests {
@@ -1846,7 +1846,7 @@ func withMessages(ctx context.Context, t *testing.T, c *proton.Client, pass stri
keyPass, err := salt.SaltForKey([]byte(pass), user.Keys.Primary().ID)
require.NoError(t, err)
_, addrKRs, err := proton.Unlock(user, addr, keyPass, queue.NoopPanicHandler{})
_, addrKRs, err := proton.Unlock(user, addr, keyPass, async.NoopPanicHandler{})
require.NoError(t, err)
fn(xslices.Map(importMessages(ctx, t, c, addr[0].ID, addrKRs[addr[0].ID], []string{}, proton.MessageFlagReceived, count), func(res proton.ImportRes) string {

View File

@@ -4,7 +4,7 @@ import (
"math/rand"
"time"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
)
type Ticker struct {
@@ -16,7 +16,7 @@ type Ticker struct {
// NewTicker returns a new ticker that ticks at a random time between period and period+jitter.
// It can be stopped by closing calling Stop().
func NewTicker(period, jitter time.Duration, panicHandler queue.PanicHandler) *Ticker {
func NewTicker(period, jitter time.Duration, panicHandler async.PanicHandler) *Ticker {
t := &Ticker{
C: make(chan time.Time),
stopCh: make(chan struct{}),
@@ -24,11 +24,7 @@ func NewTicker(period, jitter time.Duration, panicHandler queue.PanicHandler) *T
}
go func() {
defer func() {
if panicHandler != nil {
panicHandler.HandlePanic()
}
}()
defer async.HandlePanic(panicHandler)
defer close(t.doneCh)

View File

@@ -4,12 +4,12 @@ import (
"fmt"
"runtime"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/bradenaw/juniper/parallel"
)
func Unlock(user User, addresses []Address, saltedKeyPass []byte, panicHandler queue.PanicHandler) (*crypto.KeyRing, map[string]*crypto.KeyRing, error) {
func Unlock(user User, addresses []Address, saltedKeyPass []byte, panicHandler async.PanicHandler) (*crypto.KeyRing, map[string]*crypto.KeyRing, error) {
userKR, err := user.Keys.Unlock(saltedKeyPass, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to unlock user keys: %w", err)
@@ -20,11 +20,7 @@ func Unlock(user User, addresses []Address, saltedKeyPass []byte, panicHandler q
addrKRs := make(map[string]*crypto.KeyRing)
for idx, addrKR := range parallel.Map(runtime.NumCPU(), addresses, func(addr Address) *crypto.KeyRing {
defer func() {
if panicHandler != nil {
panicHandler.HandlePanic()
}
}()
defer async.HandlePanic(panicHandler)
return addr.Keys.TryUnlock(saltedKeyPass, userKR)
}) {