From 3cc8423ece9c01d81e54f04ff12b18879eef8ed8 Mon Sep 17 00:00:00 2001 From: Jakub Date: Thu, 30 Mar 2023 17:54:23 +0200 Subject: [PATCH] refactor(GODT-2500): Reorganise async methods. --- attachment_interfaces.go | 14 ++++---------- event.go | 3 ++- future.go | 22 +++++++++------------- future_test.go | 6 +++--- go.mod | 17 ++++++++++++++--- go.sum | 29 +++++++++++++++++++++++++++++ helper_test.go | 4 ++-- manager.go | 10 ++-------- manager_builder.go | 6 +++--- message.go | 11 ++++++----- message_import.go | 3 ++- option.go | 6 +++--- paging.go | 3 ++- pool.go | 20 +++++++------------- pool_test.go | 6 +++--- server/server_test.go | 16 ++++++++-------- ticker.go | 10 +++------- unlock.go | 10 +++------- 18 files changed, 105 insertions(+), 91 deletions(-) diff --git a/attachment_interfaces.go b/attachment_interfaces.go index 7e1d34c..4b8585f 100644 --- a/attachment_interfaces.go +++ b/attachment_interfaces.go @@ -4,7 +4,7 @@ import ( "bytes" "context" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" "github.com/bradenaw/juniper/parallel" ) @@ -59,10 +59,10 @@ func (SequentialScheduler) Schedule(ctx context.Context, attachmentIDs []string, type ParallelScheduler struct { workers int - panicHandler queue.PanicHandler + panicHandler async.PanicHandler } -func NewParallelScheduler(workers int, panicHandler queue.PanicHandler) *ParallelScheduler { +func NewParallelScheduler(workers int, panicHandler async.PanicHandler) *ParallelScheduler { if workers == 0 { workers = 1 } @@ -70,12 +70,6 @@ func NewParallelScheduler(workers int, panicHandler queue.PanicHandler) *Paralle return &ParallelScheduler{workers: workers} } -func (p *ParallelScheduler) handlePanic() { - if p.panicHandler != nil { - p.panicHandler.HandlePanic() - } -} - func (p ParallelScheduler) Schedule(ctx context.Context, attachmentIDs []string, storageProvider AttachmentAllocator, downloader func(context.Context, string, *bytes.Buffer) error) ([]*bytes.Buffer, error) { // If we have less attachments than the maximum works, reduce worker count to match attachment count. workers := p.workers @@ -84,7 +78,7 @@ func (p ParallelScheduler) Schedule(ctx context.Context, attachmentIDs []string, } return parallel.MapContext(ctx, workers, attachmentIDs, func(ctx context.Context, id string) (*bytes.Buffer, error) { - defer p.handlePanic() + defer async.HandlePanic(p.panicHandler) buffer := storageProvider.NewBuffer() if err := downloader(ctx, id, buffer); err != nil { diff --git a/event.go b/event.go index 63f9bdb..25b8038 100644 --- a/event.go +++ b/event.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/ProtonMail/gluon/async" "github.com/go-resty/resty/v2" ) @@ -60,7 +61,7 @@ func (c *Client) NewEventStream(ctx context.Context, period, jitter time.Duratio eventCh := make(chan Event) go func() { - defer c.m.handlePanic() + defer async.HandlePanic(c.m.panicHandler) defer close(eventCh) diff --git a/future.go b/future.go index 6bf7dd8..8a56aa9 100644 --- a/future.go +++ b/future.go @@ -1,10 +1,12 @@ package proton -import "github.com/ProtonMail/gluon/queue" +import ( + "github.com/ProtonMail/gluon/async" +) type Future[T any] struct { resCh chan res[T] - panicHandler queue.PanicHandler + panicHandler async.PanicHandler } type res[T any] struct { @@ -12,7 +14,7 @@ type res[T any] struct { err error } -func NewFuture[T any](panicHandler queue.PanicHandler, fn func() (T, error)) *Future[T] { +func NewFuture[T any](panicHandler async.PanicHandler, fn func() (T, error)) *Future[T] { resCh := make(chan res[T]) job := &Future[T]{ resCh: resCh, @@ -20,7 +22,7 @@ func NewFuture[T any](panicHandler queue.PanicHandler, fn func() (T, error)) *Fu } go func() { - defer job.handlePanic() + defer async.HandlePanic(job.panicHandler) val, err := fn() @@ -32,7 +34,7 @@ func NewFuture[T any](panicHandler queue.PanicHandler, fn func() (T, error)) *Fu func (job *Future[T]) Then(fn func(T, error)) { go func() { - defer job.handlePanic() + defer async.HandlePanic(job.panicHandler) res := <-job.resCh @@ -40,12 +42,6 @@ func (job *Future[T]) Then(fn func(T, error)) { }() } -func (job *Future[T]) handlePanic() { - if job.panicHandler != nil { - job.panicHandler.HandlePanic() - } -} - func (job *Future[T]) Get() (T, error) { res := <-job.resCh @@ -54,10 +50,10 @@ func (job *Future[T]) Get() (T, error) { type Group[T any] struct { futures []*Future[T] - panicHandler queue.PanicHandler + panicHandler async.PanicHandler } -func NewGroup[T any](panicHandler queue.PanicHandler) *Group[T] { +func NewGroup[T any](panicHandler async.PanicHandler) *Group[T] { return &Group[T]{panicHandler: panicHandler} } diff --git a/future_test.go b/future_test.go index db55b26..841cd8c 100644 --- a/future_test.go +++ b/future_test.go @@ -5,14 +5,14 @@ import ( "testing" "time" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" "github.com/stretchr/testify/require" ) func TestFuture(t *testing.T) { resCh := make(chan int) - NewFuture(queue.NoopPanicHandler{}, func() (int, error) { + NewFuture(async.NoopPanicHandler{}, func() (int, error) { return 42, nil }).Then(func(res int, err error) { resCh <- res @@ -22,7 +22,7 @@ func TestFuture(t *testing.T) { } func TestGroup(t *testing.T) { - group := NewGroup[int](queue.NoopPanicHandler{}) + group := NewGroup[int](async.NoopPanicHandler{}) for i := 0; i < 10; i++ { i := i diff --git a/go.mod b/go.mod index 9cc2f1d..24af8ed 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/Masterminds/semver/v3 v3.2.0 - github.com/ProtonMail/gluon v0.14.1 + github.com/ProtonMail/gluon v0.15.1-0.20230331095629-e23a7a1be2a8 github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8 github.com/ProtonMail/go-srp v0.0.5 github.com/ProtonMail/gopenpgp/v2 v2.5.2 @@ -27,41 +27,52 @@ require ( ) require ( + ariga.io/atlas v0.9.1-0.20230119145809-92243f7c55cb // indirect + entgo.io/ent v0.11.8 // indirect github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf // indirect github.com/ProtonMail/go-mime v0.0.0-20221031134845-8fd9bc37cf08 // indirect + github.com/agext/levenshtein v1.2.3 // indirect github.com/andybalholm/cascadia v1.3.1 // indirect + github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect github.com/bytedance/sonic v1.8.1 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/cloudflare/circl v1.3.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cronokirby/saferith v0.33.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emersion/go-imap v1.2.1 // indirect github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 // indirect github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-openapi/inflect v0.19.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.11.2 // indirect github.com/goccy/go-json v0.10.0 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-cmp v0.5.9 // indirect + github.com/hashicorp/hcl/v2 v2.16.1 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/mattn/go-isatty v0.0.17 // indirect + github.com/mattn/go-sqlite3 v1.14.16 // indirect + github.com/mitchellh/go-wordwrap v1.0.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.10 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + github.com/zclconf/go-cty v1.12.1 // indirect golang.org/x/arch v0.2.0 // indirect golang.org/x/crypto v0.6.0 // indirect + golang.org/x/mod v0.8.0 // indirect golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.5.0 // indirect google.golang.org/genproto v0.0.0-20230221151758-ace64dc21148 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/ProtonMail/gluon => /home/dev/gopath18/src/gluon diff --git a/go.sum b/go.sum index 0d85d08..ca939ad 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +ariga.io/atlas v0.9.1-0.20230119145809-92243f7c55cb h1:mbsFtavDqGdYwdDpP50LGOOZ2hgyGoJcZeOpbgKMyu4= +ariga.io/atlas v0.9.1-0.20230119145809-92243f7c55cb/go.mod h1:T230JFcENj4ZZzMkZrXFDSkv+2kXkUgpJ5FQQ5hMcKU= +entgo.io/ent v0.11.8 h1:M/M0QL1CYCUSdqGRXUrXhFYSDRJPsOOrr+RLEej/gyQ= +entgo.io/ent v0.11.8/go.mod h1:ericBi6Q8l3wBH1wEIDfKxw7rcQEuRPyBfbIzjtxJ18= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g= github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= @@ -6,6 +10,8 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo= github.com/ProtonMail/gluon v0.14.1 h1:7iXLasYoBGaOW88k4p29w7bPAOpY6aR6GErdVRu8MZs= github.com/ProtonMail/gluon v0.14.1/go.mod h1:z2AxLIiBCT1K+0OBHyaDI7AEaO5qI6/BEC2TE42vs4Q= +github.com/ProtonMail/gluon v0.15.1-0.20230331095629-e23a7a1be2a8 h1:USMR8imbxkP4Ailch4ceV3hCZTaANMIGHhb5rpZFYn4= +github.com/ProtonMail/gluon v0.15.1-0.20230331095629-e23a7a1be2a8/go.mod h1:yA4hk6CJw0BMo+YL8Y3ckCYs5L20sysu9xseshwY3QI= github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo= github.com/ProtonMail/go-crypto v0.0.0-20230124153114-0acdc8ae009b/go.mod h1:I0gYDMZ6Z5GRU7l58bNFSkPTFN6Yl12dsUlAZ8xy98g= github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8 h1:wPbRQzjjwFc0ih8puEVAOFGELsn1zoIIYdxvML7mDxA= @@ -18,8 +24,12 @@ github.com/ProtonMail/gopenpgp/v2 v2.5.2 h1:97SjlWNAxXl9P22lgwgrZRshQdiEfAht0g3Z github.com/ProtonMail/gopenpgp/v2 v2.5.2/go.mod h1:52qDaCnto6r+CoWbuU50T77XQt99lIs46HtHtvgFO3o= github.com/PuerkitoBio/goquery v1.8.1 h1:uQxhNlArOIdbrH1tr0UXwdVFgDcZDrZVdcpygAcwmWM= github.com/PuerkitoBio/goquery v1.8.1/go.mod h1:Q8ICL1kNUJ2sXGoAhPGUdYDJvgQgHzJsnnd3H7Ho5jQ= +github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo= +github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c= github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA= +github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw= +github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= github.com/bradenaw/juniper v0.10.2 h1:EY7r8SJJrigJ7lvWk6ews3K5RD4XTG9z+WSwHJKijP4= github.com/bradenaw/juniper v0.10.2/go.mod h1:Z2B7aJlQ7xbfWsnMLROj5t/5FQ94/MkIdKC30J4WvzI= github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= @@ -39,8 +49,12 @@ github.com/cronokirby/saferith v0.33.0/go.mod h1:QKJhjoqUtBsXCAVEjw38mFqoi7DebT7 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emersion/go-imap v1.2.1 h1:+s9ZjMEjOB8NzZMVTM3cCenz2JrQIGGo5j1df19WjTA= +github.com/emersion/go-imap v1.2.1/go.mod h1:Qlx1FSx2FTxjnjWpIlVNEuX+ylerZQNFE5NsmKFSejY= +github.com/emersion/go-message v0.15.0/go.mod h1:wQUEfE+38+7EW8p8aZ96ptg6bAb1iwdgej19uXASlE4= github.com/emersion/go-message v0.16.0 h1:uZLz8ClLv3V5fSFF/fFdW9jXjrZkXIpE1Fn8fKx7pO4= github.com/emersion/go-message v0.16.0/go.mod h1:pDJDgf/xeUIF+eicT6B/hPX/ZbEorKkUMPOxrPVG2eQ= +github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ= github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 h1:IbFBtwoTQyw0fIM5xv1HF+Y+3ZijDR839WMulgxCcUY= github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594/go.mod h1:aqO8z8wPrjkscevZJFVE1wXJrLpC5LtJG7fqLOsPb2U= github.com/emersion/go-vcard v0.0.0-20220507122617-d4056df0ec4a h1:cltZpe6s0SJtqK5c/5y2VrIYi8BAtDM6qjmiGYqfTik= @@ -49,6 +63,8 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8= github.com/gin-gonic/gin v1.9.0/go.mod h1:W1Me9+hsUSyj3CePGrd1/QrKJMSJ1Tu/0hFEH89961k= +github.com/go-openapi/inflect v0.19.0 h1:9jCH9scKIbHeV9m12SmPilScz6krDxKRasNNSNPXu/4= +github.com/go-openapi/inflect v0.19.0/go.mod h1:lHpZVlpIQqLyKwJ4N+YSc9hchQy/i12fJykb83CRBH4= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= @@ -65,9 +81,12 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/hcl/v2 v2.16.1 h1:BwuxEMD/tsYgbhIW7UuI3crjovf3MzuFWiVgiv57iHg= +github.com/hashicorp/hcl/v2 v2.16.1/go.mod h1:JRmR89jycNkrrqnMmvPDMd56n1rQJ2Q6KocSLCMCXng= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -79,6 +98,10 @@ github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= +github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -86,6 +109,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU= github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -114,6 +139,8 @@ github.com/urfave/cli/v2 v2.24.4/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6f github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/zclconf/go-cty v1.12.1 h1:PcupnljUm9EIvbgSHQnHhUr3fO6oFmkOrvs2BAFNXXY= +github.com/zclconf/go-cty v1.12.1/go.mod h1:s9IfD1LK5ccNMSWCVFCE2rJfHiZgi7JijgeWIMfhLvA= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= @@ -134,6 +161,8 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU golang.org/x/mobile v0.0.0-20221110043201-43a038452099/go.mod h1:aAjjkJNdrh3PMckS4B10TGS2nag27cbKR1y2BpUxsiY= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= diff --git a/helper_test.go b/helper_test.go index eef3de0..26057ce 100644 --- a/helper_test.go +++ b/helper_test.go @@ -6,7 +6,7 @@ import ( "runtime" "testing" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" "github.com/ProtonMail/go-proton-api" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" @@ -32,7 +32,7 @@ func createTestMessages(t *testing.T, c *proton.Client, pass string, count int) keyPass, err := salt.SaltForKey([]byte(pass), user.Keys.Primary().ID) require.NoError(t, err) - _, addrKRs, err := proton.Unlock(user, addr, keyPass, queue.NoopPanicHandler{}) + _, addrKRs, err := proton.Unlock(user, addr, keyPass, async.NoopPanicHandler{}) require.NoError(t, err) req := iterator.Collect(iterator.Map(iterator.Counter(count), func(i int) proton.ImportReq { diff --git a/manager.go b/manager.go index 3294e2c..4a32022 100644 --- a/manager.go +++ b/manager.go @@ -6,7 +6,7 @@ import ( "net" "sync" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" "github.com/go-resty/resty/v2" ) @@ -21,7 +21,7 @@ type Manager struct { verifyProofs bool - panicHandler queue.PanicHandler + panicHandler async.PanicHandler } func New(opts ...Option) *Manager { @@ -131,9 +131,3 @@ func (m *Manager) onConnUp() { observer(m.status) } } - -func (m *Manager) handlePanic() { - if m.panicHandler != nil { - m.panicHandler.HandlePanic() - } -} diff --git a/manager_builder.go b/manager_builder.go index 01cf480..c782751 100644 --- a/manager_builder.go +++ b/manager_builder.go @@ -4,7 +4,7 @@ import ( "net/http" "time" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" "github.com/go-resty/resty/v2" ) @@ -26,7 +26,7 @@ type managerBuilder struct { retryCount int logger resty.Logger debug bool - panicHandler queue.PanicHandler + panicHandler async.PanicHandler } func newManagerBuilder() *managerBuilder { @@ -39,7 +39,7 @@ func newManagerBuilder() *managerBuilder { retryCount: 3, logger: nil, debug: false, - panicHandler: queue.NoopPanicHandler{}, + panicHandler: async.NoopPanicHandler{}, } } diff --git a/message.go b/message.go index 6e97056..30e3bfd 100644 --- a/message.go +++ b/message.go @@ -7,6 +7,7 @@ import ( "runtime" "strconv" + "github.com/ProtonMail/gluon/async" "github.com/bradenaw/juniper/parallel" "github.com/bradenaw/juniper/xslices" "github.com/go-resty/resty/v2" @@ -87,7 +88,7 @@ func (c *Client) DeleteMessage(ctx context.Context, messageIDs ...string) error pages := xslices.Chunk(messageIDs, maxPageSize) return parallel.DoContext(ctx, runtime.NumCPU(), len(pages), func(ctx context.Context, idx int) error { - defer c.m.handlePanic() + defer async.HandlePanic(c.m.panicHandler) return c.do(ctx, func(r *resty.Request) (*resty.Response, error) { return r.SetBody(MessageActionReq{IDs: pages[idx]}).Put("/mail/v4/messages/delete") @@ -99,7 +100,7 @@ func (c *Client) MarkMessagesRead(ctx context.Context, messageIDs ...string) err pages := xslices.Chunk(messageIDs, maxPageSize) return parallel.DoContext(ctx, runtime.NumCPU(), len(pages), func(ctx context.Context, idx int) error { - defer c.m.handlePanic() + defer async.HandlePanic(c.m.panicHandler) return c.do(ctx, func(r *resty.Request) (*resty.Response, error) { return r.SetBody(MessageActionReq{IDs: pages[idx]}).Put("/mail/v4/messages/read") @@ -111,7 +112,7 @@ func (c *Client) MarkMessagesUnread(ctx context.Context, messageIDs ...string) e pages := xslices.Chunk(messageIDs, maxPageSize) return parallel.DoContext(ctx, runtime.NumCPU(), len(pages), func(ctx context.Context, idx int) error { - defer c.m.handlePanic() + defer async.HandlePanic(c.m.panicHandler) req := MessageActionReq{IDs: pages[idx]} @@ -131,7 +132,7 @@ func (c *Client) LabelMessages(ctx context.Context, messageIDs []string, labelID runtime.NumCPU(), xslices.Chunk(messageIDs, maxPageSize), func(ctx context.Context, messageIDs []string) (LabelMessagesRes, error) { - defer c.m.handlePanic() + defer async.HandlePanic(c.m.panicHandler) var res LabelMessagesRes @@ -172,7 +173,7 @@ func (c *Client) UnlabelMessages(ctx context.Context, messageIDs []string, label runtime.NumCPU(), xslices.Chunk(messageIDs, maxPageSize), func(ctx context.Context, messageIDs []string) (LabelMessagesRes, error) { - defer c.m.handlePanic() + defer async.HandlePanic(c.m.panicHandler) var res LabelMessagesRes diff --git a/message_import.go b/message_import.go index 3e91398..ab6b2b9 100644 --- a/message_import.go +++ b/message_import.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" + "github.com/ProtonMail/gluon/async" "github.com/ProtonMail/gopenpgp/v2/crypto" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/parallel" @@ -45,7 +46,7 @@ func (c *Client) ImportMessages(ctx context.Context, addrKR *crypto.KeyRing, wor workers, buffer, func(ctx context.Context, req []ImportReq) (stream.Stream[ImportRes], error) { - defer c.m.handlePanic() + defer async.HandlePanic(c.m.panicHandler) res, err := c.importMessages(ctx, req) if err != nil { diff --git a/option.go b/option.go index 8e8249f..281d469 100644 --- a/option.go +++ b/option.go @@ -3,7 +3,7 @@ package proton import ( "net/http" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" "github.com/go-resty/resty/v2" ) @@ -124,14 +124,14 @@ func (opt withDebug) config(builder *managerBuilder) { builder.debug = opt.debug } -func WithPanicHandler(panicHandler queue.PanicHandler) Option { +func WithPanicHandler(panicHandler async.PanicHandler) Option { return &withPanicHandler{ panicHandler: panicHandler, } } type withPanicHandler struct { - panicHandler queue.PanicHandler + panicHandler async.PanicHandler } func (opt withPanicHandler) config(builder *managerBuilder) { diff --git a/paging.go b/paging.go index 69a7da4..786e8f6 100644 --- a/paging.go +++ b/paging.go @@ -4,6 +4,7 @@ import ( "context" "runtime" + "github.com/ProtonMail/gluon/async" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/parallel" "github.com/bradenaw/juniper/stream" @@ -22,7 +23,7 @@ func fetchPaged[T any]( runtime.NumCPU(), runtime.NumCPU(), func(ctx context.Context, page int) (stream.Stream[T], error) { - defer c.m.handlePanic() + defer async.HandlePanic(c.m.panicHandler) values, err := fn(ctx, page, pageSize) if err != nil { diff --git a/pool.go b/pool.go index e5f7be8..da6ae1f 100644 --- a/pool.go +++ b/pool.go @@ -6,7 +6,7 @@ import ( "fmt" "sync" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" ) // ErrJobCancelled indicates the job was cancelled. @@ -14,25 +14,25 @@ var ErrJobCancelled = errors.New("job cancelled by surrounding context") // Pool is a worker pool that handles input of type In and returns results of type Out. type Pool[In comparable, Out any] struct { - queue *queue.QueuedChannel[*job[In, Out]] + queue *async.QueuedChannel[*job[In, Out]] wg sync.WaitGroup - panicHandler queue.PanicHandler + panicHandler async.PanicHandler } // doneFunc must be called to free up pool resources. type doneFunc func() // New returns a new pool. -func NewPool[In comparable, Out any](size int, panicHandler queue.PanicHandler, work func(context.Context, In) (Out, error)) *Pool[In, Out] { +func NewPool[In comparable, Out any](size int, panicHandler async.PanicHandler, work func(context.Context, In) (Out, error)) *Pool[In, Out] { pool := &Pool[In, Out]{ - queue: queue.NewQueuedChannel[*job[In, Out]](0, 0, panicHandler), + queue: async.NewQueuedChannel[*job[In, Out]](0, 0, panicHandler), } for i := 0; i < size; i++ { pool.wg.Add(1) go func() { - defer pool.handlePanic() + defer async.HandlePanic(pool.panicHandler) defer pool.wg.Done() @@ -58,12 +58,6 @@ func NewPool[In comparable, Out any](size int, panicHandler queue.PanicHandler, return pool } -func (pool *Pool[In, Out]) handlePanic() { - if pool.panicHandler != nil { - pool.panicHandler.HandlePanic() - } -} - // Process submits jobs to the pool. The callback provides access to the result, or an error if one occurred. func (pool *Pool[In, Out]) Process(ctx context.Context, reqs []In, fn func(int, In, Out, error) error) error { ctx, cancel := context.WithCancel(ctx) @@ -81,7 +75,7 @@ func (pool *Pool[In, Out]) Process(ctx context.Context, reqs []In, fn func(int, wg.Add(1) go func(index int) { - defer pool.handlePanic() + defer async.HandlePanic(pool.panicHandler) defer wg.Done() diff --git a/pool_test.go b/pool_test.go index 10dca63..e42e2b2 100644 --- a/pool_test.go +++ b/pool_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -154,7 +154,7 @@ func TestPool_ProcessAll(t *testing.T) { } func newDoubler(workers int, delay ...time.Duration) *Pool[int, int] { - return NewPool(workers, queue.NoopPanicHandler{}, func(ctx context.Context, req int) (int, error) { + return NewPool(workers, async.NoopPanicHandler{}, func(ctx context.Context, req int) (int, error) { if len(delay) > 0 { time.Sleep(delay[0]) } @@ -164,7 +164,7 @@ func newDoubler(workers int, delay ...time.Duration) *Pool[int, int] { } func newDoublerWithError(workers int) *Pool[int, int] { - return NewPool(workers, queue.NoopPanicHandler{}, func(ctx context.Context, req int) (int, error) { + return NewPool(workers, async.NoopPanicHandler{}, func(ctx context.Context, req int) (int, error) { if req%2 == 0 { return 0, errors.New("oops") } diff --git a/server/server_test.go b/server/server_test.go index 6de9f9d..bbc3313 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -19,7 +19,7 @@ import ( "github.com/bradenaw/juniper/parallel" "github.com/Masterminds/semver/v3" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" "github.com/ProtonMail/gluon/rfc822" "github.com/ProtonMail/go-proton-api" "github.com/ProtonMail/gopenpgp/v2/crypto" @@ -613,7 +613,7 @@ func TestServer_CreateMessage(t *testing.T) { pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID) require.NoError(t, err) - _, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{}) + _, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{}) require.NoError(t, err) draft, err := c.CreateDraft(ctx, addrKRs[addr[0].ID], proton.CreateDraftReq{ @@ -649,7 +649,7 @@ func TestServer_UpdateDraft(t *testing.T) { pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID) require.NoError(t, err) - _, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{}) + _, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{}) require.NoError(t, err) // Create the draft. @@ -725,7 +725,7 @@ func TestServer_SendMessage(t *testing.T) { pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID) require.NoError(t, err) - _, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{}) + _, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{}) require.NoError(t, err) draft, err := c.CreateDraft(ctx, addrKRs[addr[0].ID], proton.CreateDraftReq{ @@ -808,7 +808,7 @@ func TestServer_Import(t *testing.T) { pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID) require.NoError(t, err) - _, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{}) + _, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{}) require.NoError(t, err) res := importMessages(ctx, t, c, addr[0].ID, addrKRs[addr[0].ID], []string{}, proton.MessageFlagReceived, 1) @@ -1027,7 +1027,7 @@ func TestServer_Labels(t *testing.T) { pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID) require.NoError(t, err) - _, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{}) + _, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{}) require.NoError(t, err) for _, tt := range tests { @@ -1169,7 +1169,7 @@ func TestServer_Import_FlagsAndLabels(t *testing.T) { pass, err := salt.SaltForKey([]byte("pass"), user.Keys.Primary().ID) require.NoError(t, err) - _, addrKRs, err := proton.Unlock(user, addr, pass, queue.NoopPanicHandler{}) + _, addrKRs, err := proton.Unlock(user, addr, pass, async.NoopPanicHandler{}) require.NoError(t, err) for _, tt := range tests { @@ -1846,7 +1846,7 @@ func withMessages(ctx context.Context, t *testing.T, c *proton.Client, pass stri keyPass, err := salt.SaltForKey([]byte(pass), user.Keys.Primary().ID) require.NoError(t, err) - _, addrKRs, err := proton.Unlock(user, addr, keyPass, queue.NoopPanicHandler{}) + _, addrKRs, err := proton.Unlock(user, addr, keyPass, async.NoopPanicHandler{}) require.NoError(t, err) fn(xslices.Map(importMessages(ctx, t, c, addr[0].ID, addrKRs[addr[0].ID], []string{}, proton.MessageFlagReceived, count), func(res proton.ImportRes) string { diff --git a/ticker.go b/ticker.go index 94436a2..5fb11f8 100644 --- a/ticker.go +++ b/ticker.go @@ -4,7 +4,7 @@ import ( "math/rand" "time" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" ) type Ticker struct { @@ -16,7 +16,7 @@ type Ticker struct { // NewTicker returns a new ticker that ticks at a random time between period and period+jitter. // It can be stopped by closing calling Stop(). -func NewTicker(period, jitter time.Duration, panicHandler queue.PanicHandler) *Ticker { +func NewTicker(period, jitter time.Duration, panicHandler async.PanicHandler) *Ticker { t := &Ticker{ C: make(chan time.Time), stopCh: make(chan struct{}), @@ -24,11 +24,7 @@ func NewTicker(period, jitter time.Duration, panicHandler queue.PanicHandler) *T } go func() { - defer func() { - if panicHandler != nil { - panicHandler.HandlePanic() - } - }() + defer async.HandlePanic(panicHandler) defer close(t.doneCh) diff --git a/unlock.go b/unlock.go index fa72b16..f47be08 100644 --- a/unlock.go +++ b/unlock.go @@ -4,12 +4,12 @@ import ( "fmt" "runtime" - "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/async" "github.com/ProtonMail/gopenpgp/v2/crypto" "github.com/bradenaw/juniper/parallel" ) -func Unlock(user User, addresses []Address, saltedKeyPass []byte, panicHandler queue.PanicHandler) (*crypto.KeyRing, map[string]*crypto.KeyRing, error) { +func Unlock(user User, addresses []Address, saltedKeyPass []byte, panicHandler async.PanicHandler) (*crypto.KeyRing, map[string]*crypto.KeyRing, error) { userKR, err := user.Keys.Unlock(saltedKeyPass, nil) if err != nil { return nil, nil, fmt.Errorf("failed to unlock user keys: %w", err) @@ -20,11 +20,7 @@ func Unlock(user User, addresses []Address, saltedKeyPass []byte, panicHandler q addrKRs := make(map[string]*crypto.KeyRing) for idx, addrKR := range parallel.Map(runtime.NumCPU(), addresses, func(addr Address) *crypto.KeyRing { - defer func() { - if panicHandler != nil { - panicHandler.HandlePanic() - } - }() + defer async.HandlePanic(panicHandler) return addr.Keys.TryUnlock(saltedKeyPass, userKR) }) {