mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-02-24 02:56:52 -05:00
Compare commits
2 Commits
groupware
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26a23d5c61 | ||
|
|
372bb04ee8 |
2
go.mod
2
go.mod
@@ -56,7 +56,7 @@ require (
|
||||
github.com/mna/pigeon v1.3.0
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
|
||||
github.com/nats-io/nats-server/v2 v2.12.4
|
||||
github.com/nats-io/nats.go v1.48.0
|
||||
github.com/nats-io/nats.go v1.49.0
|
||||
github.com/oklog/run v1.2.0
|
||||
github.com/olekukonko/tablewriter v1.1.3
|
||||
github.com/onsi/ginkgo v1.16.5
|
||||
|
||||
4
go.sum
4
go.sum
@@ -916,8 +916,8 @@ github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
|
||||
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
|
||||
github.com/nats-io/nats-server/v2 v2.12.4 h1:ZnT10v2LU2Xcoiy8ek9X6Se4YG8EuMfIfvAEuFVx1Ts=
|
||||
github.com/nats-io/nats-server/v2 v2.12.4/go.mod h1:5MCp/pqm5SEfsvVZ31ll1088ZTwEUdvRX1Hmh/mTTDg=
|
||||
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
|
||||
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE=
|
||||
github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw=
|
||||
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
|
||||
github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
|
||||
@@ -71,25 +71,25 @@
|
||||
]
|
||||
},
|
||||
"dependencies": {
|
||||
"@fontsource/roboto": "^5.2.5",
|
||||
"@fontsource/roboto": "^5.2.9",
|
||||
"@material-ui/core": "^4.12.4",
|
||||
"@material-ui/icons": "^4.11.3",
|
||||
"@testing-library/jest-dom": "^6.6.3",
|
||||
"@testing-library/jest-dom": "^6.9.1",
|
||||
"@testing-library/react": "^12.1.5",
|
||||
"@testing-library/user-event": "^14.6.1",
|
||||
"@types/jest": "^29.5.14",
|
||||
"@types/jest": "^30.0.0",
|
||||
"@types/node": "^22.15.30",
|
||||
"@types/react": "^17.0.80",
|
||||
"@types/react-dom": "^17.0.25",
|
||||
"@types/react-redux": "^7.1.33",
|
||||
"@types/react-redux": "^7.1.34",
|
||||
"@types/redux-logger": "^3.0.13",
|
||||
"axios": "^1.8.2",
|
||||
"axios": "^1.13.5",
|
||||
"classnames": "^2.5.1",
|
||||
"i18next": "^25.2.1",
|
||||
"i18next-browser-languagedetector": "^8.1.0",
|
||||
"i18next": "^25.8.11",
|
||||
"i18next-browser-languagedetector": "^8.2.1",
|
||||
"i18next-http-backend": "^3.0.2",
|
||||
"i18next-resources-to-backend": "^1.2.1",
|
||||
"query-string": "^9.2.0",
|
||||
"query-string": "^9.3.1",
|
||||
"react": "^17.0.2",
|
||||
"react-app-polyfill": "^3.0.0",
|
||||
"react-dom": "^17.0.2",
|
||||
@@ -101,7 +101,7 @@
|
||||
"redux-logger": "^3.0.6",
|
||||
"redux-thunk": "^2.4.2",
|
||||
"render-if": "^0.1.1",
|
||||
"web-vitals": "^5.0.2"
|
||||
"web-vitals": "^5.1.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@babel/core": "7.26.10",
|
||||
@@ -113,16 +113,16 @@
|
||||
"babel-preset-react-app": "^10.1.0",
|
||||
"case-sensitive-paths-webpack-plugin": "2.4.0",
|
||||
"cldr": "^7.9.0",
|
||||
"css-loader": "7.1.2",
|
||||
"css-minimizer-webpack-plugin": "^7.0.2",
|
||||
"css-loader": "^7.1.4",
|
||||
"css-minimizer-webpack-plugin": "^7.0.4",
|
||||
"dotenv": "16.4.7",
|
||||
"dotenv-expand": "12.0.2",
|
||||
"dotenv-expand": "^12.0.3",
|
||||
"eslint": "^7.32.0",
|
||||
"eslint-config-react-app": "^6.0.0",
|
||||
"eslint-loader": "^4.0.2",
|
||||
"eslint-plugin-flowtype": "^5.10.0",
|
||||
"eslint-plugin-i18next": "^6.1.1",
|
||||
"eslint-plugin-import": "^2.31.0",
|
||||
"eslint-plugin-i18next": "^6.1.3",
|
||||
"eslint-plugin-import": "^2.32.0",
|
||||
"eslint-plugin-jest": "^24.7.0",
|
||||
"eslint-plugin-jsx-a11y": "^6.10.2",
|
||||
"eslint-plugin-react": "^7.37.5",
|
||||
@@ -130,10 +130,10 @@
|
||||
"eslint-plugin-testing-library": "^3.10.2",
|
||||
"eslint-webpack-plugin": "^3.2.0",
|
||||
"file-loader": "6.2.0",
|
||||
"html-webpack-plugin": "5.6.3",
|
||||
"html-webpack-plugin": "^5.6.6",
|
||||
"i18next-conv": "^15.1.1",
|
||||
"i18next-parser": "^9.3.0",
|
||||
"jest": "30.0.0",
|
||||
"jest": "30.2.0",
|
||||
"license-checker-rseidelsohn": "4.4.2",
|
||||
"mini-css-extract-plugin": "2.9.2",
|
||||
"pnp-webpack-plugin": "1.7.0",
|
||||
@@ -143,15 +143,15 @@
|
||||
"postcss-preset-env": "10.1.3",
|
||||
"postcss-safe-parser": "7.0.1",
|
||||
"react-dev-utils": "^12.0.1",
|
||||
"resolve": "1.22.8",
|
||||
"resolve": "^1.22.11",
|
||||
"resolve-url-loader": "^5.0.0",
|
||||
"sass-loader": "^16.0.5",
|
||||
"sass-loader": "^16.0.7",
|
||||
"source-map-explorer": "^2.5.3",
|
||||
"typescript": "^5.8.3",
|
||||
"url-loader": "4.1.1",
|
||||
"webpack": "5.99.6",
|
||||
"webpack": "5.105.2",
|
||||
"webpack-manifest-plugin": "5.0.0",
|
||||
"workbox-webpack-plugin": "7.1.0"
|
||||
"workbox-webpack-plugin": "7.4.0"
|
||||
},
|
||||
"packageManager": "pnpm@9.15.4"
|
||||
}
|
||||
|
||||
2438
services/idp/pnpm-lock.yaml
generated
2438
services/idp/pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
24
vendor/github.com/nats-io/nats.go/README.md
generated
vendored
24
vendor/github.com/nats-io/nats.go/README.md
generated
vendored
@@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
|
||||
go get github.com/nats-io/nats.go@latest
|
||||
|
||||
# To get a specific version:
|
||||
go get github.com/nats-io/nats.go@v1.48.0
|
||||
go get github.com/nats-io/nats.go@v1.49.0
|
||||
|
||||
# Note that the latest major version for NATS Server is v2:
|
||||
go get github.com/nats-io/nats-server/v2@latest
|
||||
@@ -134,7 +134,7 @@ The simplest form is to use the helper method UserCredentials(credsFilepath).
|
||||
nc, err := nats.Connect(url, nats.UserCredentials("user.creds"))
|
||||
```
|
||||
|
||||
The helper methods creates two callback handlers to present the user JWT and sign the nonce challenge from the server.
|
||||
The helper method creates two callback handlers to present the user JWT and sign the nonce challenge from the server.
|
||||
The core client library never has direct access to your private key and simply performs the callback for signing the server challenge.
|
||||
The helper will load and wipe and erase memory it uses for each connect or reconnect.
|
||||
|
||||
@@ -177,7 +177,7 @@ nc, err := nats.Connect("tls://nats.demo.io:4443")
|
||||
// We provide a helper method to make this case easier.
|
||||
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))
|
||||
|
||||
// If the server requires client certificate, there is an helper function for that too:
|
||||
// If the server requires client certificate, there is a helper function for that too:
|
||||
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
|
||||
nc, err = nats.Connect("tls://localhost:4443", cert)
|
||||
|
||||
@@ -210,17 +210,17 @@ if err != nil {
|
||||
|
||||
// "*" matches any token, at any level of the subject.
|
||||
nc.Subscribe("foo.*.baz", func(m *Msg) {
|
||||
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
|
||||
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data))
|
||||
})
|
||||
|
||||
nc.Subscribe("foo.bar.*", func(m *Msg) {
|
||||
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
|
||||
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data))
|
||||
})
|
||||
|
||||
// ">" matches any length of the tail of a subject, and can only be the last token
|
||||
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
|
||||
nc.Subscribe("foo.>", func(m *Msg) {
|
||||
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
|
||||
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data))
|
||||
})
|
||||
|
||||
// Matches all of the above
|
||||
@@ -237,7 +237,7 @@ nc.Publish("foo.bar.baz", []byte("Hello World"))
|
||||
// Normal subscribers will continue to work as expected.
|
||||
|
||||
nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
|
||||
received += 1;
|
||||
received += 1
|
||||
})
|
||||
```
|
||||
|
||||
@@ -267,9 +267,9 @@ fmt.Println("All clear!")
|
||||
// FlushTimeout specifies a timeout value as well.
|
||||
err := nc.FlushTimeout(1*time.Second)
|
||||
if err != nil {
|
||||
fmt.Println("All clear!")
|
||||
} else {
|
||||
fmt.Println("Flushed timed out!")
|
||||
} else {
|
||||
fmt.Println("All clear!")
|
||||
}
|
||||
|
||||
// Auto-unsubscribe after MAX_WANTED messages received
|
||||
@@ -285,7 +285,7 @@ nc1.Subscribe("foo", func(m *Msg) {
|
||||
fmt.Printf("Received a message: %s\n", string(m.Data))
|
||||
})
|
||||
|
||||
nc2.Publish("foo", []byte("Hello World!"));
|
||||
nc2.Publish("foo", []byte("Hello World!"))
|
||||
|
||||
```
|
||||
|
||||
@@ -339,7 +339,7 @@ nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))
|
||||
// For token based authentication:
|
||||
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))
|
||||
|
||||
// You can even pass the two at the same time in case one of the server
|
||||
// You can even pass the two at the same time in case one of the servers
|
||||
// in the mesh requires token instead of user name and password.
|
||||
nc, err = nats.Connect("nats://localhost:4222",
|
||||
nats.UserInfo("foo", "bar"),
|
||||
@@ -372,7 +372,7 @@ msg, err := sub.NextMsgWithContext(ctx)
|
||||
|
||||
```
|
||||
|
||||
## Backwards compatibility
|
||||
## Backward compatibility
|
||||
|
||||
In the development of nats.go, we are committed to maintaining backward compatibility and ensuring a stable and reliable experience for all users. In general, we follow the standard go compatibility guidelines.
|
||||
However, it's important to clarify our stance on certain types of changes:
|
||||
|
||||
18
vendor/github.com/nats-io/nats.go/go_test.mod
generated
vendored
18
vendor/github.com/nats-io/nats.go/go_test.mod
generated
vendored
@@ -4,19 +4,19 @@ go 1.24.0
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/klauspost/compress v1.18.0
|
||||
github.com/klauspost/compress v1.18.2
|
||||
github.com/nats-io/jwt/v2 v2.8.0
|
||||
github.com/nats-io/nats-server/v2 v2.12.0
|
||||
github.com/nats-io/nkeys v0.4.11
|
||||
github.com/nats-io/nats-server/v2 v2.12.3
|
||||
github.com/nats-io/nkeys v0.4.12
|
||||
github.com/nats-io/nuid v1.0.1
|
||||
google.golang.org/protobuf v1.23.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op // indirect
|
||||
github.com/google/go-tpm v0.9.5 // indirect
|
||||
github.com/minio/highwayhash v1.0.3 // indirect
|
||||
golang.org/x/crypto v0.42.0 // indirect
|
||||
golang.org/x/sys v0.36.0 // indirect
|
||||
golang.org/x/time v0.13.0 // indirect
|
||||
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op // indirect
|
||||
github.com/google/go-tpm v0.9.7 // indirect
|
||||
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
|
||||
golang.org/x/crypto v0.46.0 // indirect
|
||||
golang.org/x/sys v0.39.0 // indirect
|
||||
golang.org/x/time v0.14.0 // indirect
|
||||
)
|
||||
|
||||
36
vendor/github.com/nats-io/nats.go/go_test.sum
generated
vendored
36
vendor/github.com/nats-io/nats.go/go_test.sum
generated
vendored
@@ -1,5 +1,5 @@
|
||||
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0=
|
||||
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
|
||||
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op h1:Ucf+QxEKMbPogRO5guBNe5cgd9uZgfoJLOYs8WWhtjM=
|
||||
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
@@ -12,27 +12,27 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU=
|
||||
github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
|
||||
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
|
||||
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
|
||||
github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA=
|
||||
github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
|
||||
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
|
||||
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
|
||||
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
|
||||
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
|
||||
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
|
||||
github.com/nats-io/nats-server/v2 v2.12.0 h1:OIwe8jZUqJFrh+hhiyKu8snNib66qsx806OslqJuo74=
|
||||
github.com/nats-io/nats-server/v2 v2.12.0/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww=
|
||||
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
|
||||
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
|
||||
github.com/nats-io/nats-server/v2 v2.12.3 h1:KRv+1n7lddMVgkJPQer+pt36TcO0ENxjilBmeWdjcHs=
|
||||
github.com/nats-io/nats-server/v2 v2.12.3/go.mod h1:MQXjG9WjyXKz9koWzUc3jYUMKD8x3CLmTNy91IQQz3Y=
|
||||
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
|
||||
github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
|
||||
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
|
||||
golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU=
|
||||
golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0=
|
||||
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
|
||||
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
|
||||
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
|
||||
golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
|
||||
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
|
||||
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
|
||||
749
vendor/github.com/nats-io/nats.go/jetstream/MIGRATION.md
generated
vendored
Normal file
749
vendor/github.com/nats-io/nats.go/jetstream/MIGRATION.md
generated
vendored
Normal file
@@ -0,0 +1,749 @@
|
||||
# Migrating from Legacy JetStream API to `jetstream` Package
|
||||
|
||||
This guide helps you migrate from the legacy JetStream API in the `nats` package
|
||||
(`nats.JetStreamContext`) to the new `jetstream` package
|
||||
(`github.com/nats-io/nats.go/jetstream`).
|
||||
|
||||
- [Why Migrate?](#why-migrate)
|
||||
- [Getting Started](#getting-started)
|
||||
- [Stream Management](#stream-management)
|
||||
- [Consumer Management](#consumer-management)
|
||||
- [Publishing](#publishing)
|
||||
- [Consuming Messages](#consuming-messages)
|
||||
- [Replacing js.Subscribe()](#replacing-jssubscribe)
|
||||
- [Replacing js.PullSubscribe()](#replacing-jspullsubscribe)
|
||||
- [Ordered Consumers](#ordered-consumers)
|
||||
- [Push Consumers](#push-consumers)
|
||||
- [Subscription Options Mapping](#subscription-options-mapping)
|
||||
- [Error Handling in Consume/Messages](#error-handling-in-consumemessages)
|
||||
- [Message Acknowledgement](#message-acknowledgement)
|
||||
- [KeyValue Store](#keyvalue-store)
|
||||
- [Object Store](#object-store)
|
||||
|
||||
## Why Migrate?
|
||||
|
||||
The legacy JetStream API (`nats.JetStreamContext`) is deprecated. The `jetstream`
|
||||
package provides a cleaner, more predictable API with several key improvements:
|
||||
|
||||
- **Explicit resource management.** Streams and consumers are created and managed
|
||||
explicitly. The legacy `js.Subscribe()` implicitly created consumers behind
|
||||
the scenes, leading to surprising behavior.
|
||||
|
||||
- **Pull consumers as the default.** Pull consumers with `Consume()` and
|
||||
`Messages()` provide the same continuous message delivery as the legacy push-based
|
||||
`Subscribe()`, but with better flow control and no slow consumer issues.
|
||||
|
||||
- **`context.Context` throughout.** All API calls accept `context.Context` for
|
||||
timeout and cancellation, replacing the mix of `MaxWait`, `AckWait`, and
|
||||
`Context()` options.
|
||||
|
||||
- **Clear interface separation.** Instead of one large `JetStreamContext` interface,
|
||||
functionality is split across focused interfaces: `JetStream`, `Stream` and
|
||||
`Consumer`.
|
||||
|
||||
## Getting Started
|
||||
|
||||
The core NATS connection remains unchanged. Only the JetStream initialization
|
||||
differs:
|
||||
|
||||
```go
|
||||
import (
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
nc, _ := nats.Connect(nats.DefaultURL)
|
||||
```
|
||||
|
||||
**Legacy:**
|
||||
|
||||
```go
|
||||
js, _ := nc.JetStream()
|
||||
|
||||
// With domain
|
||||
js, _ := nc.JetStream(nats.Domain("hub"))
|
||||
|
||||
// With custom API prefix
|
||||
js, _ := nc.JetStream(nats.APIPrefix("myprefix"))
|
||||
```
|
||||
|
||||
**New:**
|
||||
|
||||
```go
|
||||
js, _ := jetstream.New(nc)
|
||||
|
||||
// With domain
|
||||
js, _ := jetstream.NewWithDomain(nc, "hub")
|
||||
|
||||
// With custom API prefix
|
||||
js, _ := jetstream.NewWithAPIPrefix(nc, "myprefix")
|
||||
```
|
||||
|
||||
### Initialization Options
|
||||
|
||||
| Legacy | New |
|
||||
|-----------------------------------|--------------------------------------------|
|
||||
| `nats.Domain(domain)` | `jetstream.NewWithDomain(nc, domain)` |
|
||||
| `nats.APIPrefix(prefix)` | `jetstream.NewWithAPIPrefix(nc, prefix)` |
|
||||
| `nats.PublishAsyncMaxPending(n)` | `jetstream.WithPublishAsyncMaxPending(n)` |
|
||||
| `nats.PublishAsyncErrHandler(cb)` | `jetstream.WithPublishAsyncErrHandler(cb)` |
|
||||
|
||||
## Stream Management
|
||||
|
||||
`StreamConfig` is essentially the same struct — it just lives in the `jetstream`
|
||||
package now. The new API takes `StreamConfig` by value (not pointer) and
|
||||
management methods return a `Stream` handle instead of `*StreamInfo`.
|
||||
|
||||
| Legacy | New | Notes |
|
||||
|---------------------------------|-------------------------------------|-------------------------------------------------------------------|
|
||||
| `js.AddStream(cfg)` | `js.CreateStream(ctx, cfg)` | Also: `CreateOrUpdateStream()` |
|
||||
| `js.UpdateStream(cfg)` | `js.UpdateStream(ctx, cfg)` | |
|
||||
| `js.DeleteStream(name)` | `js.DeleteStream(ctx, name)` | |
|
||||
| `js.StreamInfo(name)` | `s.Info(ctx)` / `s.CachedInfo()` | Get stream handle first via `js.Stream(ctx, name)` |
|
||||
| `js.PurgeStream(name, opts...)` | `s.Purge(ctx, opts...)` | Options: `WithPurgeSubject`, `WithPurgeSequence`, `WithPurgeKeep` |
|
||||
| `js.GetMsg(name, seq)` | `s.GetMsg(ctx, seq)` | |
|
||||
| `js.GetLastMsg(name, subj)` | `s.GetLastMsgForSubject(ctx, subj)` | |
|
||||
| `js.DeleteMsg(name, seq)` | `s.DeleteMsg(ctx, seq)` | Also: `s.SecureDeleteMsg()` |
|
||||
| `js.Streams()` | `js.ListStreams(ctx)` | Returns lister with `.Info()` channel and `.Err()` |
|
||||
| `js.StreamNames()` | `js.StreamNames(ctx)` | Returns lister with `.Name()` channel and `.Err()` |
|
||||
|
||||
The key architectural difference is that stream-specific operations (purge, get/delete
|
||||
messages) now live on the `Stream` interface instead of the top-level context. Get
|
||||
a stream handle first, then operate on it:
|
||||
|
||||
```go
|
||||
s, _ := js.Stream(ctx, "ORDERS")
|
||||
s.Purge(ctx)
|
||||
msg, _ := s.GetMsg(ctx, 100)
|
||||
```
|
||||
|
||||
## Consumer Management
|
||||
|
||||
The biggest conceptual change: in the legacy API, `js.Subscribe()` would
|
||||
implicitly create consumers. In the new API, consumer creation is always explicit
|
||||
and separate from message consumption.
|
||||
|
||||
| Legacy | New | Notes |
|
||||
|------------------------------------------|----------------------------------------|----------------------------------------------------------------|
|
||||
| `js.AddConsumer(stream, cfg)` | `js.CreateConsumer(ctx, stream, cfg)` | Also: `CreateOrUpdateConsumer()`, `UpdateConsumer()` |
|
||||
| `js.Subscribe(subj, handler)` (implicit) | No equivalent | Must create consumer explicitly first |
|
||||
| `js.ConsumerInfo(stream, name)` | `cons.Info(ctx)` / `cons.CachedInfo()` | Get consumer handle first via `js.Consumer(ctx, stream, name)` |
|
||||
| `js.DeleteConsumer(stream, name)` | `js.DeleteConsumer(ctx, stream, name)` | |
|
||||
| `js.Consumers(stream)` | `s.ListConsumers(ctx)` | Returns lister with `.Info()` channel and `.Err()` |
|
||||
| `js.ConsumerNames(stream)` | `s.ConsumerNames(ctx)` | Returns lister with `.Name()` channel and `.Err()` |
|
||||
|
||||
Consumer management is available at two levels:
|
||||
|
||||
- On `JetStream` — requires stream name as parameter (e.g. `js.CreateConsumer(ctx, "ORDERS", cfg)`), bypassing the need to fetch a stream
|
||||
- On `Stream` — no stream name needed (e.g. `s.CreateConsumer(ctx, cfg)`)
|
||||
|
||||
The new API provides three creation methods:
|
||||
|
||||
- `CreateConsumer` — fails if the consumer already exists with different config
|
||||
- `UpdateConsumer` - fails if the consumer does not exist
|
||||
- `CreateOrUpdateConsumer` — creates or updates as needed
|
||||
|
||||
**Additional notes on consumer behavior:**
|
||||
|
||||
- The default ack policy changed between the APIs. In the legacy API,
|
||||
`AddConsumer()` defaulted to `AckNone`. In the new API, the default is
|
||||
`AckExplicit`.
|
||||
|
||||
- In the legacy API, `sub.Unsubscribe()` on an implicitly created
|
||||
consumer would automatically delete that consumer on the server. The new API
|
||||
does not perform any automatic cleanup - consumers must be deleted explicitly
|
||||
via `DeleteConsumer()`, or via `InactiveThreshold` on the consumer
|
||||
config to let the server remove it automatically after a period of inactivity.
|
||||
|
||||
Push consumers use separate methods: `CreatePushConsumer`, `CreateOrUpdatePushConsumer`,
|
||||
`UpdatePushConsumer`, and `PushConsumer` (for getting a handle).
|
||||
|
||||
```go
|
||||
s, _ := js.Stream(ctx, "ORDERS")
|
||||
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Durable: "processor",
|
||||
})
|
||||
```
|
||||
|
||||
## Publishing
|
||||
|
||||
Publishing is largely the same, with the addition of `context.Context` for
|
||||
synchronous operations.
|
||||
|
||||
### Synchronous Publish
|
||||
|
||||
**Legacy:**
|
||||
|
||||
```go
|
||||
ack, _ := js.Publish("ORDERS.new", []byte("hello"))
|
||||
ack, _ = js.PublishMsg(&nats.Msg{
|
||||
Subject: "ORDERS.new",
|
||||
Data: []byte("hello"),
|
||||
})
|
||||
```
|
||||
|
||||
**New:**
|
||||
|
||||
```go
|
||||
ack, _ := js.Publish(ctx, "ORDERS.new", []byte("hello"))
|
||||
ack, _ = js.PublishMsg(ctx, &nats.Msg{
|
||||
Subject: "ORDERS.new",
|
||||
Data: []byte("hello"),
|
||||
})
|
||||
```
|
||||
|
||||
### Async Publish
|
||||
|
||||
**Legacy:**
|
||||
|
||||
```go
|
||||
ackF, _ := js.PublishAsync("ORDERS.new", []byte("hello"))
|
||||
|
||||
select {
|
||||
case ack := <-ackF.Ok():
|
||||
fmt.Println(ack.Sequence)
|
||||
case err := <-ackF.Err():
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
// Wait for all pending acks
|
||||
<-js.PublishAsyncComplete()
|
||||
```
|
||||
|
||||
**New:**
|
||||
|
||||
```go
|
||||
// Async publish does not take context (returns immediately)
|
||||
ackF, _ := js.PublishAsync("ORDERS.new", []byte("hello"))
|
||||
|
||||
select {
|
||||
case ack := <-ackF.Ok():
|
||||
fmt.Println(ack.Sequence)
|
||||
case err := <-ackF.Err():
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
<-js.PublishAsyncComplete()
|
||||
```
|
||||
|
||||
### Publish Options
|
||||
|
||||
| Legacy | New |
|
||||
|------------------------------------------|---------------------------------------------------|
|
||||
| `nats.MsgId(id)` | `jetstream.WithMsgID(id)` |
|
||||
| `nats.ExpectStream(name)` | `jetstream.WithExpectStream(name)` |
|
||||
| `nats.ExpectLastSequence(seq)` | `jetstream.WithExpectLastSequence(seq)` |
|
||||
| `nats.ExpectLastSequencePerSubject(seq)` | `jetstream.WithExpectLastSequencePerSubject(seq)` |
|
||||
| `nats.ExpectLastMsgId(id)` | `jetstream.WithExpectLastMsgID(id)` |
|
||||
| `nats.RetryWait(dur)` | `jetstream.WithRetryWait(dur)` |
|
||||
| `nats.RetryAttempts(n)` | `jetstream.WithRetryAttempts(n)` |
|
||||
| `nats.StallWait(dur)` | `jetstream.WithStallWait(dur)` |
|
||||
|
||||
## Consuming Messages
|
||||
|
||||
This is the most significant area of change. The legacy API offered many
|
||||
subscription flavors (`Subscribe`, `SubscribeSync`, `QueueSubscribe`,
|
||||
`ChanSubscribe`, `PullSubscribe`) that blurred the line between consumer
|
||||
creation, stream lookup and message consumption. The new API separates these
|
||||
concerns: first create a consumer, then choose how to receive messages.
|
||||
|
||||
With the exception of PullSubscribe, all legacy subscription flavors utilized push consumers under the hood. The new API recommends pull consumers for all use cases, as they provide better flow control and no risk of slow consumer issues. Pull-based consumption is available via `Consume()` and `Messages()`, which maintain persistent pull subscriptions with pre-buffering for efficient continuous delivery. Push consumers are still supported for users who prefer that model, but pull consumers are the recommended default.
|
||||
|
||||
### Replacing `js.Subscribe()`
|
||||
|
||||
The legacy `js.Subscribe()` created a push consumer behind the scenes (unless
|
||||
explicitly specified otherwise via `nats.Bind()` or `nats.Durable()`) and
|
||||
delivered messages either via a callback. In the new API, the recommended
|
||||
replacement is a **pull consumer** with `Consume()` or `Messages()`. These
|
||||
provide the same continuous delivery with better flow control.
|
||||
|
||||
#### Legacy: callback subscription
|
||||
|
||||
```go
|
||||
sub, _ := js.Subscribe("ORDERS.*", func(msg *nats.Msg) {
|
||||
fmt.Printf("Received: %s\n", string(msg.Data))
|
||||
msg.Ack()
|
||||
}, nats.Durable("processor"), nats.ManualAck)
|
||||
defer sub.Unsubscribe()
|
||||
```
|
||||
|
||||
#### New: callback with `Consume()`
|
||||
|
||||
`Consume()` is the closest equivalent to `js.Subscribe()` — it delivers messages
|
||||
to a callback function continuously.
|
||||
|
||||
```go
|
||||
s, _ := js.Stream(ctx, "ORDERS")
|
||||
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Durable: "processor",
|
||||
FilterSubject: "ORDERS.*",
|
||||
})
|
||||
|
||||
cc, _ := cons.Consume(func(msg jetstream.Msg) {
|
||||
fmt.Printf("Received: %s\n", string(msg.Data()))
|
||||
msg.Ack()
|
||||
})
|
||||
defer cc.Stop()
|
||||
```
|
||||
|
||||
> Note: `ManualAck()` is not needed — messages are never auto-acknowledged in
|
||||
> the new API.
|
||||
|
||||
#### New: iterator with `Messages()`
|
||||
|
||||
`Messages()` provides an iterator-based approach, useful when you want explicit
|
||||
control over when the next message is fetched.
|
||||
|
||||
```go
|
||||
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Durable: "processor",
|
||||
FilterSubject: "ORDERS.*",
|
||||
})
|
||||
|
||||
iter, _ := cons.Messages()
|
||||
for {
|
||||
msg, err := iter.Next()
|
||||
if err != nil {
|
||||
// handle error
|
||||
}
|
||||
fmt.Printf("Received: %s\n", string(msg.Data()))
|
||||
msg.Ack()
|
||||
}
|
||||
// Call iter.Stop() when done
|
||||
```
|
||||
|
||||
Both `Consume()` and `Messages()` maintain overlapping pull requests to the
|
||||
server, providing efficient continuous delivery without gaps.
|
||||
|
||||
#### Legacy: synchronous subscription
|
||||
|
||||
```go
|
||||
sub, _ := js.SubscribeSync("ORDERS.*", nats.Durable("processor"))
|
||||
msg, _ := sub.NextMsg(time.Second)
|
||||
```
|
||||
|
||||
**New:** Use `Messages()` and call `Next()`:
|
||||
|
||||
```go
|
||||
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Durable: "processor",
|
||||
FilterSubject: "ORDERS.*",
|
||||
})
|
||||
|
||||
iter, _ := cons.Messages()
|
||||
msg, _ := iter.Next()
|
||||
```
|
||||
|
||||
#### Legacy: queue subscription
|
||||
|
||||
```go
|
||||
// Multiple instances share work via a queue group
|
||||
sub, _ := js.QueueSubscribe("ORDERS.*", "workers", handler,
|
||||
nats.Durable("processor"))
|
||||
```
|
||||
|
||||
**New with pull consumers:** With pull consumers, there is no need for an
|
||||
explicit queue group. Multiple application instances (or goroutines) calling
|
||||
`Consume()` or `Messages()` on the same durable consumer will naturally
|
||||
distribute messages among themselves — the server tracks pending acknowledgements
|
||||
and avoids delivering the same message to multiple consumers:
|
||||
|
||||
```go
|
||||
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Durable: "processor",
|
||||
})
|
||||
|
||||
cc, _ := cons.Consume(handler)
|
||||
defer cc.Stop()
|
||||
```
|
||||
|
||||
**New with push consumers:** If you need push-based queue semantics, set
|
||||
`DeliverGroup` on a push consumer — this is the direct equivalent of the legacy
|
||||
queue group:
|
||||
|
||||
```go
|
||||
cons, _ := s.CreateOrUpdatePushConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Durable: "processor",
|
||||
DeliverSubject: "deliver.orders",
|
||||
DeliverGroup: "workers",
|
||||
})
|
||||
|
||||
cc, _ := cons.Consume(handler)
|
||||
defer cc.Stop()
|
||||
```
|
||||
|
||||
> **Note:** Push consumers with `DeliverGroup` cannot be flow controlled. If you
|
||||
> experience slow consumer issues, consider using pull-based consumers instead —
|
||||
> multiple instances on the same durable consumer achieve the same work
|
||||
> distribution without the slow consumer risk.
|
||||
|
||||
#### Legacy: channel subscription
|
||||
|
||||
```go
|
||||
ch := make(chan *nats.Msg, 64)
|
||||
sub, _ := js.ChanSubscribe("ORDERS.*", ch, nats.Durable("processor"))
|
||||
|
||||
for msg := range ch {
|
||||
msg.Ack()
|
||||
}
|
||||
```
|
||||
|
||||
**New:** There is no direct channel-based equivalent. Use `Consume()` or
|
||||
`Messages()` instead.
|
||||
|
||||
### Replacing `js.PullSubscribe()`
|
||||
|
||||
The legacy pull subscription required creating a subscription and then calling
|
||||
`Fetch()` in a loop.
|
||||
|
||||
#### Legacy: pull subscribe + fetch loop
|
||||
|
||||
```go
|
||||
sub, _ := js.PullSubscribe("ORDERS.*", "processor")
|
||||
|
||||
for {
|
||||
msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
|
||||
for _, msg := range msgs {
|
||||
fmt.Printf("Received: %s\n", string(msg.Data))
|
||||
msg.Ack()
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**New with `Fetch()`/`FetchNoWait()` (one-off batch):**
|
||||
|
||||
If you specifically need one-off batch fetching, `Fetch()` is available directly
|
||||
on the consumer — no separate subscription step:
|
||||
|
||||
```go
|
||||
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Durable: "processor",
|
||||
FilterSubject: "ORDERS.*",
|
||||
})
|
||||
|
||||
// non-blocking, returns a `FetchResult` that provides messages and error
|
||||
msgs, _ := cons.Fetch(10, jetstream.FetchMaxWait(5*time.Second))
|
||||
for msg := range msgs.Messages() {
|
||||
fmt.Printf("Received: %s\n", string(msg.Data()))
|
||||
msg.Ack()
|
||||
}
|
||||
if msgs.Error() != nil {
|
||||
// handle error
|
||||
}
|
||||
```
|
||||
|
||||
> **Warning:** `Fetch()`, `FetchNoWait()`, and `FetchBytes()` are one-off,
|
||||
> single pull requests. They do not perform pre-buffering optimizations. For
|
||||
> continuous message processing, always prefer `Consume()` or `Messages()`.
|
||||
> When using `FetchBytes()`, the requested byte size must stay under the
|
||||
> client's max pending bytes limit (64MB by default), otherwise it will trigger
|
||||
> slow consumer errors on the underlying subscription.
|
||||
|
||||
### Ordered Consumers
|
||||
|
||||
Ordered consumers provide strictly ordered, gap-free message delivery. The library
|
||||
automatically recreates the underlying consumer on sequence gaps or heartbeat
|
||||
failures.
|
||||
|
||||
**Legacy:**
|
||||
|
||||
```go
|
||||
sub, _ := js.Subscribe("ORDERS.*", handler, nats.OrderedConsumer())
|
||||
```
|
||||
|
||||
**New:**
|
||||
|
||||
```go
|
||||
cons, _ := js.OrderedConsumer(ctx, "ORDERS", jetstream.OrderedConsumerConfig{
|
||||
FilterSubjects: []string{"ORDERS.*"},
|
||||
})
|
||||
|
||||
// Use the same consumption methods as regular consumers
|
||||
cc, _ := cons.Consume(func(msg jetstream.Msg) {
|
||||
fmt.Printf("Received: %s\n", string(msg.Data()))
|
||||
})
|
||||
defer cc.Stop()
|
||||
```
|
||||
|
||||
### Push Consumers
|
||||
|
||||
Pull consumers are recommended for most use cases, but push consumers are also
|
||||
supported. Push consumers require `DeliverSubject` in their config and only
|
||||
support `Consume()` (not `Fetch()` or `Messages()`).
|
||||
|
||||
**Legacy:**
|
||||
|
||||
```go
|
||||
sub, _ := js.Subscribe("ORDERS.*", handler,
|
||||
nats.Durable("processor"),
|
||||
nats.DeliverSubject("deliver.orders"),
|
||||
nats.IdleHeartbeat(30*time.Second),
|
||||
)
|
||||
```
|
||||
|
||||
**New:**
|
||||
|
||||
```go
|
||||
cons, _ := s.CreateOrUpdatePushConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Durable: "processor",
|
||||
FilterSubject: "ORDERS.*",
|
||||
DeliverSubject: "deliver.orders",
|
||||
IdleHeartbeat: 30 * time.Second,
|
||||
})
|
||||
|
||||
cc, _ := cons.Consume(func(msg jetstream.Msg) {
|
||||
fmt.Printf("Received: %s\n", string(msg.Data()))
|
||||
msg.Ack()
|
||||
})
|
||||
defer cc.Stop()
|
||||
```
|
||||
|
||||
### Subscription Options Mapping
|
||||
|
||||
Most legacy `SubOpt` options map directly to `ConsumerConfig` fields. Since
|
||||
consumer creation is explicit, these are set at creation time rather than passed
|
||||
as subscription options.
|
||||
|
||||
| Legacy SubOpt | New ConsumerConfig field |
|
||||
|-------------------------------------|---------------------------------------------------------------------------|
|
||||
| `nats.Durable("name")` | `Durable: "name"` |
|
||||
| `nats.ConsumerName("name")` | `Name: "name"` |
|
||||
| `nats.Description("desc")` | `Description: "desc"` |
|
||||
| `nats.DeliverAll()` | `DeliverPolicy: jetstream.DeliverAllPolicy` |
|
||||
| `nats.DeliverLast()` | `DeliverPolicy: jetstream.DeliverLastPolicy` |
|
||||
| `nats.DeliverLastPerSubject()` | `DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy` |
|
||||
| `nats.DeliverNew()` | `DeliverPolicy: jetstream.DeliverNewPolicy` |
|
||||
| `nats.StartSequence(seq)` | `DeliverPolicy: jetstream.DeliverByStartSequencePolicy, OptStartSeq: seq` |
|
||||
| `nats.StartTime(t)` | `DeliverPolicy: jetstream.DeliverByStartTimePolicy, OptStartTime: &t` |
|
||||
| `nats.AckExplicit()` | `AckPolicy: jetstream.AckExplicitPolicy` |
|
||||
| `nats.AckAll()` | `AckPolicy: jetstream.AckAllPolicy` |
|
||||
| `nats.AckNone()` | `AckPolicy: jetstream.AckNonePolicy` |
|
||||
| `nats.ManualAck()` | Not needed (messages are never auto-acked) |
|
||||
| `nats.MaxDeliver(n)` | `MaxDeliver: n` |
|
||||
| `nats.MaxAckPending(n)` | `MaxAckPending: n` |
|
||||
| `nats.BackOff(durations)` | `BackOff: durations` |
|
||||
| `nats.ReplayOriginal()` | `ReplayPolicy: jetstream.ReplayOriginalPolicy` |
|
||||
| `nats.ReplayInstant()` | `ReplayPolicy: jetstream.ReplayInstantPolicy` |
|
||||
| `nats.RateLimit(bps)` | `RateLimit: bps` |
|
||||
| `nats.HeadersOnly()` | `HeadersOnly: true` |
|
||||
| `nats.InactiveThreshold(dur)` | `InactiveThreshold: dur` |
|
||||
| `nats.ConsumerFilterSubjects(s...)` | `FilterSubjects: s` |
|
||||
| `nats.ConsumerReplicas(n)` | `Replicas: n` |
|
||||
| `nats.ConsumerMemoryStorage()` | `MemoryStorage: true` |
|
||||
|
||||
The following options have no direct equivalent — use the consumer handle
|
||||
directly instead:
|
||||
|
||||
| Legacy SubOpt | New equivalent |
|
||||
|-------------------------------|---------------------------------------------------------------------|
|
||||
| `nats.Bind(stream, consumer)` | `js.Consumer(ctx, stream, consumer)` or `s.Consumer(ctx, consumer)` |
|
||||
| `nats.BindStream(stream)` | Use `js.Stream(ctx, stream)` to get a stream handle |
|
||||
| `nats.OrderedConsumer()` | `js.OrderedConsumer(ctx, stream, cfg)` |
|
||||
|
||||
### Consume/Messages Options
|
||||
|
||||
`Consume()` and `Messages()` accept options that control pull request behavior:
|
||||
|
||||
| Option | Description |
|
||||
|----------------------------|--------------------------------------------------------------|
|
||||
| `PullMaxMessages(n)` | Max messages buffered (default: 500) |
|
||||
| `PullMaxBytes(n)` | Max bytes buffered (mutually exclusive with PullMaxMessages) |
|
||||
| `PullExpiry(dur)` | Pull request timeout (default: 30s) |
|
||||
| `PullHeartbeat(dur)` | Idle heartbeat interval |
|
||||
| `PullThresholdMessages(n)` | Refill threshold (default: 50% of max) |
|
||||
| `PullThresholdBytes(n)` | Byte-based refill threshold |
|
||||
| `StopAfter(n)` | Auto-stop after N messages |
|
||||
| `ConsumeErrHandler(fn)` | Custom error handler |
|
||||
|
||||
### Error Handling in Consume/Messages
|
||||
|
||||
Both `Consume()` and `Messages()` handle server-sent status messages internally.
|
||||
Some errors are terminal (stop consumption), while others are recoverable
|
||||
(consumption continues).
|
||||
|
||||
**Terminal errors** — consumption stops automatically:
|
||||
|
||||
- `ErrConsumerDeleted` — the consumer was deleted on the server
|
||||
- `ErrBadRequest` — invalid request (e.g. misconfigured consumer)
|
||||
- Connection closed — for `Consume()` this surfaces as `ErrConnectionClosed`;
|
||||
for `Messages()`, `Next()` returns `ErrMsgIteratorClosed`
|
||||
|
||||
**Recoverable errors** — reported via error handler, consumption continues:
|
||||
|
||||
- `ErrNoHeartbeat` — missed idle heartbeats from server; a new pull request
|
||||
is issued automatically
|
||||
- `ErrConsumerLeadershipChanged` — consumer moved to a different server in the
|
||||
cluster; pending counts are reset
|
||||
- `nats.ErrNoResponders` — no JetStream service available (temporary)
|
||||
|
||||
#### Error handling with `Consume()`
|
||||
|
||||
Use `ConsumeErrHandler` to be notified about both terminal and recoverable errors:
|
||||
|
||||
```go
|
||||
cc, _ := cons.Consume(func(msg jetstream.Msg) {
|
||||
msg.Ack()
|
||||
}, jetstream.ConsumeErrHandler(func(cc jetstream.ConsumeContext, err error) {
|
||||
if errors.Is(err, jetstream.ErrConsumerDeleted) ||
|
||||
errors.Is(err, jetstream.ErrBadRequest) {
|
||||
log.Fatalf("terminal consumer error: %v", err)
|
||||
}
|
||||
log.Printf("recoverable consumer error: %v", err)
|
||||
}))
|
||||
defer cc.Stop()
|
||||
```
|
||||
|
||||
#### Error handling with `Messages()`
|
||||
|
||||
With `Messages()`, terminal errors are returned directly by `Next()`. By default,
|
||||
`ErrNoHeartbeat` is also returned by `Next()` (controlled by
|
||||
`WithMessagesErrOnMissingHeartbeat`), but it is not terminal — you can continue
|
||||
calling `Next()`:
|
||||
|
||||
```go
|
||||
iter, _ := cons.Messages()
|
||||
for {
|
||||
msg, err := iter.Next()
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
|
||||
// iterator was stopped (either explicitly or due to connection close)
|
||||
break
|
||||
}
|
||||
if errors.Is(err, jetstream.ErrNoHeartbeat) {
|
||||
// recoverable — new pull request is issued, keep going
|
||||
log.Println("missed heartbeat, re-pulling")
|
||||
continue
|
||||
}
|
||||
// ErrConsumerDeleted, ErrBadRequest are terminal
|
||||
log.Fatalf("terminal error: %v", err)
|
||||
}
|
||||
msg.Ack()
|
||||
}
|
||||
```
|
||||
|
||||
## Message Acknowledgement
|
||||
|
||||
Ack methods are similar, with minor naming changes. The main difference is that
|
||||
message fields are accessed via methods instead of struct fields.
|
||||
|
||||
| Legacy | New |
|
||||
|-------------------------|------------------------------|
|
||||
| `msg.Ack()` | Unchanged |
|
||||
| `msg.AckSync()` | `msg.DoubleAck(ctx)` |
|
||||
| `msg.Nak()` | Unchanged |
|
||||
| `msg.NakWithDelay(dur)` | Unchanged |
|
||||
| `msg.InProgress()` | Unchanged |
|
||||
| `msg.Term()` | Unchanged |
|
||||
| N/A | `msg.TermWithReason(reason)` |
|
||||
| `msg.Metadata()` | Unchanged |
|
||||
|
||||
### Accessing Message Data
|
||||
|
||||
**Legacy:** Direct struct fields on `*nats.Msg`:
|
||||
|
||||
```go
|
||||
fmt.Println(string(msg.Data))
|
||||
fmt.Println(msg.Subject)
|
||||
fmt.Println(msg.Header.Get("key"))
|
||||
```
|
||||
|
||||
**New:** Methods on `jetstream.Msg` interface:
|
||||
|
||||
```go
|
||||
fmt.Println(string(msg.Data()))
|
||||
fmt.Println(msg.Subject())
|
||||
fmt.Println(msg.Headers().Get("key"))
|
||||
```
|
||||
|
||||
## KeyValue Store
|
||||
|
||||
The KV API is nearly identical. The main changes are:
|
||||
|
||||
1. All methods take `context.Context` as the first parameter
|
||||
2. New `CreateOrUpdateKeyValue()` and `UpdateKeyValue()` methods
|
||||
3. Types live in the `jetstream` package
|
||||
|
||||
**Legacy:**
|
||||
|
||||
```go
|
||||
js, _ := nc.JetStream()
|
||||
kv, _ := js.CreateKeyValue(&nats.KeyValueConfig{
|
||||
Bucket: "profiles",
|
||||
})
|
||||
|
||||
kv.Put("sue.color", []byte("blue"))
|
||||
entry, _ := kv.Get("sue.color")
|
||||
fmt.Println(string(entry.Value()))
|
||||
|
||||
watcher, _ := kv.Watch("sue.*")
|
||||
defer watcher.Stop()
|
||||
```
|
||||
|
||||
**New:**
|
||||
|
||||
```go
|
||||
js, _ := jetstream.New(nc)
|
||||
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
|
||||
Bucket: "profiles",
|
||||
})
|
||||
|
||||
kv.Put(ctx, "sue.color", []byte("blue"))
|
||||
entry, _ := kv.Get(ctx, "sue.color")
|
||||
fmt.Println(string(entry.Value()))
|
||||
|
||||
watcher, _ := kv.Watch(ctx, "sue.*")
|
||||
defer watcher.Stop()
|
||||
```
|
||||
|
||||
### KV Management Methods
|
||||
|
||||
| Legacy | New |
|
||||
|-----------------------------|---------------------------------------|
|
||||
| `js.KeyValue(bucket)` | `js.KeyValue(ctx, bucket)` |
|
||||
| `js.CreateKeyValue(cfg)` | `js.CreateKeyValue(ctx, cfg)` |
|
||||
| N/A | `js.UpdateKeyValue(ctx, cfg)` |
|
||||
| N/A | `js.CreateOrUpdateKeyValue(ctx, cfg)` |
|
||||
| `js.DeleteKeyValue(bucket)` | `js.DeleteKeyValue(ctx, bucket)` |
|
||||
| `js.KeyValueStoreNames()` | `js.KeyValueStoreNames(ctx)` |
|
||||
| `js.KeyValueStores()` | `js.KeyValueStores(ctx)` |
|
||||
|
||||
## Object Store
|
||||
|
||||
Same pattern as KV — all methods gain `context.Context`, types move to `jetstream`
|
||||
package.
|
||||
|
||||
**Legacy:**
|
||||
|
||||
```go
|
||||
js, _ := nc.JetStream()
|
||||
os, _ := js.CreateObjectStore(&nats.ObjectStoreConfig{
|
||||
Bucket: "configs",
|
||||
})
|
||||
|
||||
os.PutString("config-1", "data")
|
||||
result, _ := os.Get("config-1")
|
||||
data, _ := io.ReadAll(result)
|
||||
```
|
||||
|
||||
**New:**
|
||||
|
||||
```go
|
||||
js, _ := jetstream.New(nc)
|
||||
os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{
|
||||
Bucket: "configs",
|
||||
})
|
||||
|
||||
os.PutString(ctx, "config-1", "data")
|
||||
result, _ := os.Get(ctx, "config-1")
|
||||
data, _ := io.ReadAll(result)
|
||||
```
|
||||
|
||||
### Object Store Management Methods
|
||||
|
||||
| Legacy | New |
|
||||
|--------------------------------|------------------------------------------|
|
||||
| `js.ObjectStore(bucket)` | `js.ObjectStore(ctx, bucket)` |
|
||||
| `js.CreateObjectStore(cfg)` | `js.CreateObjectStore(ctx, cfg)` |
|
||||
| N/A | `js.UpdateObjectStore(ctx, cfg)` |
|
||||
| N/A | `js.CreateOrUpdateObjectStore(ctx, cfg)` |
|
||||
| `js.DeleteObjectStore(bucket)` | `js.DeleteObjectStore(ctx, bucket)` |
|
||||
46
vendor/github.com/nats-io/nats.go/jetstream/README.md
generated
vendored
46
vendor/github.com/nats-io/nats.go/jetstream/README.md
generated
vendored
@@ -121,7 +121,7 @@ func main() {
|
||||
messageCounter++
|
||||
}
|
||||
|
||||
fmt.Printf("received %d messages\n", messageCounter)
|
||||
fmt.Printf("Received %d messages\n", messageCounter)
|
||||
|
||||
if msgs.Error() != nil {
|
||||
fmt.Println("Error during Fetch(): ", msgs.Error())
|
||||
@@ -224,7 +224,7 @@ _ = s.Purge(ctx, jetstream.WithPurgeSequence(100))
|
||||
_ = s.Purge(ctx, jetstream.WithPurgeKeep(10))
|
||||
```
|
||||
|
||||
- Get and messages from stream
|
||||
- Get and delete messages from a stream
|
||||
|
||||
```go
|
||||
// get message from stream with sequence number == 100
|
||||
@@ -240,7 +240,7 @@ _ = s.DeleteMsg(ctx, 100)
|
||||
- Get information about a stream
|
||||
|
||||
```go
|
||||
// Fetches latest stream info from server
|
||||
// Fetches the latest stream info from server
|
||||
info, _ := s.Info(ctx)
|
||||
fmt.Println(info.Config.Name)
|
||||
|
||||
@@ -310,7 +310,7 @@ cons2 := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
|
||||
// or an illegal property is to be updated (e.g. AckPolicy)
|
||||
updated, _ := js.UpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
|
||||
AckPolicy: jetstream.AckExplicitPolicy,
|
||||
Description: "updated consumer"
|
||||
Description: "updated consumer",
|
||||
})
|
||||
|
||||
// get consumer handle
|
||||
@@ -336,7 +336,7 @@ cons, _ := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
})
|
||||
|
||||
// get consumer handle
|
||||
cons, _ = stream.Consumer(ctx, "ORDERS", "foo")
|
||||
cons, _ = stream.Consumer(ctx, "foo")
|
||||
|
||||
// delete a consumer
|
||||
stream.DeleteConsumer(ctx, "foo")
|
||||
@@ -395,20 +395,19 @@ js, _ := jetstream.New(nc)
|
||||
// create a consumer (this is an idempotent operation)
|
||||
cons, _ := js.OrderedConsumer(ctx, "ORDERS", jetstream.OrderedConsumerConfig{
|
||||
// Filter results from "ORDERS" stream by specific subject
|
||||
FilterSubjects: []{"ORDERS.A"},
|
||||
FilterSubjects: []string{"ORDERS.A"},
|
||||
})
|
||||
```
|
||||
|
||||
### Receiving messages from pull consumers
|
||||
|
||||
The `Consumer` interface covers allows fetching messages on demand, with
|
||||
pre-defined batch size on bytes limit, or continuous push-like receiving of
|
||||
The `Consumer` interface allows fetching messages on demand, with a
|
||||
pre-defined batch size or byte limit, or continuous push-like receiving of
|
||||
messages.
|
||||
|
||||
#### __Single fetch__
|
||||
|
||||
This pattern pattern allows fetching a defined number of messages in a single
|
||||
RPC.
|
||||
This pattern allows fetching a defined number of messages in a single RPC.
|
||||
|
||||
- Using `Fetch` or `FetchBytes`, consumer will return up to the provided number
|
||||
of messages/bytes. By default, `Fetch()` will wait 30 seconds before timing out
|
||||
@@ -481,10 +480,10 @@ single messages on demand.
|
||||
Subject filtering is achieved by configuring a consumer with a `FilterSubject`
|
||||
value.
|
||||
|
||||
##### Using `Consume()` receive messages in a callback
|
||||
##### Using `Consume()` to receive messages in a callback
|
||||
|
||||
```go
|
||||
cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{
|
||||
cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
|
||||
AckPolicy: jetstream.AckExplicitPolicy,
|
||||
// receive messages from ORDERS.A subject only
|
||||
FilterSubject: "ORDERS.A"
|
||||
@@ -498,7 +497,7 @@ consContext, _ := c.Consume(func(msg jetstream.Msg) {
|
||||
defer consContext.Stop()
|
||||
```
|
||||
|
||||
Similarly to `Messages()`, `Consume()` can be supplied with options to modify
|
||||
Similar to `Messages()`, `Consume()` can be supplied with options to modify
|
||||
the behavior of a single pull request:
|
||||
|
||||
- `PullMaxMessages(int)` - up to provided number of messages will be buffered
|
||||
@@ -511,7 +510,6 @@ the behavior of a single pull request:
|
||||
request. If the value is set too low, the consumer will stall and not be able
|
||||
to consume messages.
|
||||
- `PullExpiry(time.Duration)` - timeout on a single pull request to the server
|
||||
type PullThresholdMessages int
|
||||
- `PullThresholdMessages(int)` - amount of messages which triggers refilling the
|
||||
buffer
|
||||
- `PullThresholdBytes(int)` - amount of bytes which triggers refilling the
|
||||
@@ -521,10 +519,10 @@ request. An error will be triggered if at least 2 heartbeats are missed
|
||||
- `ConsumeErrHandler(func (ConsumeContext, error))` - when used, sets a
|
||||
custom error handler on `Consume()`, allowing e.g. tracking missing
|
||||
heartbeats.
|
||||
- `PullMaxMessagesWithBytesLimit` - up to the provided number of messages will
|
||||
be buffered and a single fetch size will be limited to the provided value.
|
||||
- `PullMaxMessagesWithBytesLimit(int, int)` - up to the provided number of messages
|
||||
will be buffered and a single fetch size will be limited to the provided value.
|
||||
This is an advanced option and should be used with caution. Most of the time,
|
||||
`PullMaxMessages` or `PullMaxBytes` should be used instead. Note that he byte
|
||||
`PullMaxMessages` or `PullMaxBytes` should be used instead. Note that the byte
|
||||
limit should never be set to a value lower than the maximum message size that
|
||||
can be expected from the server. If the byte limit is lower than the maximum
|
||||
message size, the consumer will stall and not be able to consume messages.
|
||||
@@ -575,10 +573,10 @@ iter, _ := cons.Messages(jetstream.PullMaxMessages(10), jetstream.PullMaxBytes(1
|
||||
- `PullHeartbeat(time.Duration)` - idle heartbeat duration for a single pull
|
||||
request. An error will be triggered if at least 2 heartbeats are missed (unless
|
||||
`WithMessagesErrOnMissingHeartbeat(false)` is used)
|
||||
- `PullMaxMessagesWithBytesLimit` - up to the provided number of messages will
|
||||
be buffered and a single fetch size will be limited to the provided value.
|
||||
- `PullMaxMessagesWithBytesLimit(int, int)` - up to the provided number of messages
|
||||
will be buffered and a single fetch size will be limited to the provided value.
|
||||
This is an advanced option and should be used with caution. Most of the time,
|
||||
`PullMaxMessages` or `PullMaxBytes` should be used instead. Note that he byte
|
||||
`PullMaxMessages` or `PullMaxBytes` should be used instead. Note that the byte
|
||||
limit should never be set to a value lower than the maximum message size that
|
||||
can be expected from the server. If the byte limit is lower than the maximum
|
||||
message size, the consumer will stall and not be able to consume messages.
|
||||
@@ -622,7 +620,7 @@ can be set to prevent the consumer from receiving more messages than it can
|
||||
handle.
|
||||
|
||||
```go
|
||||
cons, _ := js.CreateOrUpdatePushConsumer("ORDERS", jetstream.ConsumerConfig{
|
||||
cons, _ := js.CreateOrUpdatePushConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
|
||||
DeliverSubject: nats.NewInbox()
|
||||
AckPolicy: jetstream.AckExplicitPolicy,
|
||||
// receive messages from ORDERS.A subject only
|
||||
@@ -671,7 +669,7 @@ setting various headers. Additionally, for `PublishMsg()` headers can be set
|
||||
directly on `nats.Msg`.
|
||||
|
||||
```go
|
||||
// All 3 implementations are work identically
|
||||
// All 3 implementations work identically
|
||||
ack, err := js.PublishMsg(ctx, &nats.Msg{
|
||||
Data: []byte("hello"),
|
||||
Subject: "ORDERS.new",
|
||||
@@ -972,14 +970,14 @@ js.DeleteObjectStore(ctx, "configs")
|
||||
|
||||
Object Stores support Watchers, which can be used to watch for changes on
|
||||
objects in a given bucket. Watcher will receive a notification on a channel when
|
||||
a change occurs. By default, watcher will return latest information for all
|
||||
a change occurs. By default, watcher will return the latest information for all
|
||||
objects in a bucket. After sending all initial values, watcher will send nil on
|
||||
the channel to signal that all initial values have been sent and it will start
|
||||
sending updates when changes occur.
|
||||
|
||||
>__NOTE:__ Watchers do not retrieve values for objects, only metadata (containing
|
||||
>information such as object name, bucket name, object size etc.). If object data
|
||||
>is required, `Get` method should be used.
|
||||
>is required, the `Get` method should be used.
|
||||
|
||||
Watcher supports several configuration options:
|
||||
|
||||
|
||||
6
vendor/github.com/nats-io/nats.go/jetstream/api.go
generated
vendored
6
vendor/github.com/nats-io/nats.go/jetstream/api.go
generated
vendored
@@ -50,11 +50,11 @@ const (
|
||||
// apiConsumerCreateT is used to create consumers.
|
||||
apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"
|
||||
|
||||
// apiConsumerCreateT is used to create consumers.
|
||||
// apiConsumerCreateWithFilterSubjectT is used to create consumers with a filter subject.
|
||||
// it accepts stream name, consumer name and filter subject
|
||||
apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"
|
||||
|
||||
// apiConsumerInfoT is used to create consumers.
|
||||
// apiConsumerInfoT is used to retrieve consumer information.
|
||||
apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
|
||||
|
||||
// apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
|
||||
@@ -96,7 +96,7 @@ const (
|
||||
// apiMsgGetT is the endpoint to get a message.
|
||||
apiMsgGetT = "STREAM.MSG.GET.%s"
|
||||
|
||||
// apiMsgGetT is the endpoint to perform a direct get of a message.
|
||||
// apiDirectMsgGetT is the endpoint to perform a direct get of a message.
|
||||
apiDirectMsgGetT = "DIRECT.GET.%s"
|
||||
|
||||
// apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.
|
||||
|
||||
32
vendor/github.com/nats-io/nats.go/jetstream/consumer.go
generated
vendored
32
vendor/github.com/nats-io/nats.go/jetstream/consumer.go
generated
vendored
@@ -44,13 +44,13 @@ type (
|
||||
// deletes and restarts. They provide limited configuration options
|
||||
// using [OrderedConsumerConfig].
|
||||
//
|
||||
// Consumer provides method for optimized continuous consumption of messages
|
||||
// Consumer provides methods for optimized continuous consumption of messages
|
||||
// using Consume and Messages methods, as well as simple one-off messages
|
||||
// retrieval using Fetch and Next methods.
|
||||
Consumer interface {
|
||||
// Fetch is used to retrieve up to a provided number of messages from a
|
||||
// stream. This method will send a single request and deliver either all
|
||||
// requested messages unless time out is met earlier. Fetch timeout
|
||||
// requested messages unless the timeout is met earlier. Fetch timeout
|
||||
// defaults to 30 seconds and can be configured using FetchMaxWait
|
||||
// option.
|
||||
//
|
||||
@@ -74,9 +74,9 @@ type (
|
||||
// subscription is created for each execution.
|
||||
Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
|
||||
|
||||
// FetchBytes is used to retrieve up to a provided bytes from the
|
||||
// stream. This method will send a single request and deliver the
|
||||
// provided number of bytes unless time out is met earlier. FetchBytes
|
||||
// FetchBytes is used to retrieve up to a provided number of bytes from
|
||||
// the stream. This method will send a single request and deliver the
|
||||
// provided number of bytes unless the timeout is met earlier. FetchBytes
|
||||
// timeout defaults to 30 seconds and can be configured using
|
||||
// FetchMaxWait option.
|
||||
//
|
||||
@@ -84,7 +84,7 @@ type (
|
||||
// 10 seconds. For shorter requests, the idle heartbeat is disabled.
|
||||
// This can be configured using FetchHeartbeat option. If a client does
|
||||
// not receive a heartbeat message from a stream for more than 2 times
|
||||
// the idle heartbeat setting, Fetch will return ErrNoHeartbeat.
|
||||
// the idle heartbeat setting, FetchBytes will return [ErrNoHeartbeat].
|
||||
//
|
||||
// FetchBytes is non-blocking and returns MessageBatch, exposing a channel
|
||||
// for delivered messages.
|
||||
@@ -127,7 +127,7 @@ type (
|
||||
// option, which provides information about errors encountered during
|
||||
// consumption (both transient and terminal)
|
||||
// - Consume can be configured to stop after a certain number of
|
||||
// messages is received using StopAfter option.
|
||||
// messages have been received using StopAfter option.
|
||||
// - Consume can be optimized for throughput or memory usage using
|
||||
// PullExpiry, PullMaxMessages, PullMaxBytes and PullHeartbeat options.
|
||||
// Unless there is a specific use case, these options should not be used.
|
||||
@@ -136,8 +136,8 @@ type (
|
||||
// the consumer.
|
||||
Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error)
|
||||
|
||||
// Messages returns MessagesContext, allowing continuously iterating
|
||||
// over messages on a stream. Messages can be configured using
|
||||
// Messages returns MessagesContext, allowing continuous iteration
|
||||
// over messages in a stream. Messages can be configured using
|
||||
// PullMessagesOpt options:
|
||||
//
|
||||
// - Messages can be optimized for throughput or memory usage using
|
||||
@@ -149,8 +149,8 @@ type (
|
||||
Messages(opts ...PullMessagesOpt) (MessagesContext, error)
|
||||
|
||||
// Next is used to retrieve the next message from the consumer. This
|
||||
// method will block until the message is retrieved or timeout is
|
||||
// reached.
|
||||
// method will block until the message is retrieved or the timeout
|
||||
// is reached.
|
||||
Next(opts ...FetchOpt) (Msg, error)
|
||||
|
||||
// Info fetches current ConsumerInfo from the server.
|
||||
@@ -496,12 +496,12 @@ func resumeConsumer(ctx context.Context, js *jetStream, stream, consumer string)
|
||||
return pauseConsumer(ctx, js, stream, consumer, nil)
|
||||
}
|
||||
|
||||
func validateConsumerName(dur string) error {
|
||||
if dur == "" {
|
||||
return fmt.Errorf("%w: '%s'", ErrInvalidConsumerName, "name is required")
|
||||
func validateConsumerName(name string) error {
|
||||
if name == "" {
|
||||
return fmt.Errorf("%w: name is required", ErrInvalidConsumerName)
|
||||
}
|
||||
if strings.ContainsAny(dur, ">*. /\\") {
|
||||
return fmt.Errorf("%w: '%s'", ErrInvalidConsumerName, dur)
|
||||
if strings.ContainsAny(name, ">*. /\\") {
|
||||
return fmt.Errorf("%w: '%s'", ErrInvalidConsumerName, name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
42
vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go
generated
vendored
42
vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go
generated
vendored
@@ -216,14 +216,14 @@ type (
|
||||
// settings from stream's ConsumerLimits. If neither are set, server
|
||||
// default is 5 seconds.
|
||||
//
|
||||
// A consumer is considered inactive there are not pull requests
|
||||
// received by the server (for pull consumers), or no interest detected
|
||||
// on deliver subject (for push consumers), not if there are no
|
||||
// A consumer is considered inactive if no pull requests are received by
|
||||
// the server (for pull consumers), or no interest is detected on the
|
||||
// deliver subject (for push consumers), not if there are no
|
||||
// messages to be delivered.
|
||||
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
|
||||
|
||||
// Replicas the number of replicas for the consumer's state. By default,
|
||||
// consumers inherit the number of replicas from the stream.
|
||||
// Replicas is the number of replicas for the consumer's state. By
|
||||
// default, consumers inherit the number of replicas from the stream.
|
||||
Replicas int `json:"num_replicas"`
|
||||
|
||||
// MemoryStorage is a flag to force the consumer to use memory storage
|
||||
@@ -243,19 +243,19 @@ type (
|
||||
// PauseUntil is for suspending the consumer until the deadline.
|
||||
PauseUntil *time.Time `json:"pause_until,omitempty"`
|
||||
|
||||
// PriorityPolicy represents he priority policy the consumer is set to.
|
||||
// PriorityPolicy represents the priority policy the consumer is set to.
|
||||
// Requires nats-server v2.11.0 or later.
|
||||
PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"`
|
||||
|
||||
// PinnedTTL represents the time after which the client will be unpinned
|
||||
// if no new pull requests are sent.Used with PriorityPolicyPinned.
|
||||
// if no new pull requests are sent. Used with PriorityPolicyPinned.
|
||||
// Requires nats-server v2.11.0 or later.
|
||||
PinnedTTL time.Duration `json:"priority_timeout,omitempty"`
|
||||
|
||||
// PriorityGroups is a list of priority groups this consumer supports.
|
||||
PriorityGroups []string `json:"priority_groups,omitempty"`
|
||||
|
||||
// Fields specific for push consumers:
|
||||
// Fields specific to push consumers:
|
||||
|
||||
// DeliverSubject is the subject to deliver messages to for push consumers
|
||||
DeliverSubject string `json:"deliver_subject,omitempty"`
|
||||
@@ -264,9 +264,9 @@ type (
|
||||
DeliverGroup string `json:"deliver_group,omitempty"`
|
||||
|
||||
// FlowControl is a flag to enable flow control for the consumer.
|
||||
// When set, server will regularly send an empty message with Status
|
||||
// header 100 and a reply subject, consumers must reply to these
|
||||
// messages to control the rate of message delivery
|
||||
// When set, the server will regularly send an empty message with status
|
||||
// header 100 and a reply subject. Consumers must reply to these
|
||||
// messages to control the rate of message delivery.
|
||||
FlowControl bool `json:"flow_control,omitempty"`
|
||||
|
||||
// IdleHeartbeat enables push consumer idle heartbeat messages.
|
||||
@@ -302,9 +302,9 @@ type (
|
||||
|
||||
// ReplayPolicy defines the rate at which messages are sent to the
|
||||
// consumer. If ReplayOriginalPolicy is set, messages are sent in the
|
||||
// same intervals in which they were stored on stream. This can be used
|
||||
// e.g. to simulate production traffic in development environments. If
|
||||
// ReplayInstantPolicy is set, messages are sent as fast as possible.
|
||||
// same intervals in which they were stored on the stream. This can be
|
||||
// used e.g. to simulate production traffic in development environments.
|
||||
// If ReplayInstantPolicy is set, messages are sent as fast as possible.
|
||||
// Defaults to ReplayInstantPolicy.
|
||||
ReplayPolicy ReplayPolicy `json:"replay_policy"`
|
||||
|
||||
@@ -317,12 +317,12 @@ type (
|
||||
// (and no payload). Defaults to false.
|
||||
HeadersOnly bool `json:"headers_only,omitempty"`
|
||||
|
||||
// Maximum number of attempts for the consumer to be recreated in a
|
||||
// single recreation cycle. Defaults to unlimited.
|
||||
// MaxResetAttempts is the maximum number of attempts to recreate the
|
||||
// consumer in a single recovery cycle. Defaults to unlimited.
|
||||
MaxResetAttempts int
|
||||
|
||||
// Metadata is a set of application-defined key-value pairs for
|
||||
// associating metadata on the consumer. This feature requires
|
||||
// associating metadata with the consumer. This feature requires
|
||||
// nats-server v2.10.0 or later.
|
||||
Metadata map[string]string `json:"metadata,omitempty"`
|
||||
|
||||
@@ -386,7 +386,7 @@ func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
|
||||
case jsonString("prioritized"):
|
||||
*p = PriorityPolicyPrioritized
|
||||
default:
|
||||
return fmt.Errorf("nats: can not unmarshal %q", data)
|
||||
return fmt.Errorf("nats: cannot unmarshal %q", data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -446,7 +446,7 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
|
||||
case jsonString("last_per_subject"):
|
||||
*p = DeliverLastPerSubjectPolicy
|
||||
default:
|
||||
return fmt.Errorf("nats: can not unmarshal %q", data)
|
||||
return fmt.Errorf("nats: cannot unmarshal %q", data)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -509,7 +509,7 @@ func (p *AckPolicy) UnmarshalJSON(data []byte) error {
|
||||
case jsonString("explicit"):
|
||||
*p = AckExplicitPolicy
|
||||
default:
|
||||
return fmt.Errorf("nats: can not unmarshal %q", data)
|
||||
return fmt.Errorf("nats: cannot unmarshal %q", data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -554,7 +554,7 @@ func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
|
||||
case jsonString("original"):
|
||||
*p = ReplayOriginalPolicy
|
||||
default:
|
||||
return fmt.Errorf("nats: can not unmarshal %q", data)
|
||||
return fmt.Errorf("nats: cannot unmarshal %q", data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
12
vendor/github.com/nats-io/nats.go/jetstream/errors.go
generated
vendored
12
vendor/github.com/nats-io/nats.go/jetstream/errors.go
generated
vendored
@@ -151,7 +151,7 @@ var (
|
||||
// FilterSubjects are specified when creating consumer.
|
||||
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}
|
||||
|
||||
// ErrDuplicateFilterSubjects is returned when filter subjects overlap when
|
||||
// ErrOverlappingFilterSubjects is returned when filter subjects overlap when
|
||||
// creating consumer.
|
||||
ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}}
|
||||
|
||||
@@ -167,8 +167,8 @@ var (
|
||||
// already created in the server.
|
||||
ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"}
|
||||
|
||||
// ErrConsumerNotFound is an error returned when consumer with given name
|
||||
// does not exist.
|
||||
// ErrConsumerNameAlreadyInUse is an error returned when attempting to create
|
||||
// a consumer with a name that is already in use.
|
||||
ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"}
|
||||
|
||||
// ErrNotPullConsumer is returned when attempting to fetch or create pull
|
||||
@@ -289,8 +289,8 @@ var (
|
||||
// shutdown.
|
||||
ErrServerShutdown JetStreamError = &jsError{message: "server shutdown"}
|
||||
|
||||
// ErrOrderedConsumerReset is returned when resetting ordered consumer fails
|
||||
// due to too many attempts.
|
||||
// ErrOrderedConsumerReset indicates that the ordered consumer was
|
||||
// automatically reset and recreated to preserve message ordering.
|
||||
ErrOrderedConsumerReset JetStreamError = &jsError{message: "recreating ordered consumer"}
|
||||
|
||||
// ErrOrderConsumerUsedAsFetch is returned when ordered consumer was already
|
||||
@@ -353,7 +353,7 @@ var (
|
||||
// deleted.
|
||||
ErrKeyDeleted JetStreamError = &jsError{message: "key was deleted"}
|
||||
|
||||
// ErrHistoryToLarge is returned when provided history limit is larger than
|
||||
// ErrHistoryTooLarge is returned when provided history limit is larger than
|
||||
// 64.
|
||||
ErrHistoryTooLarge JetStreamError = &jsError{message: "history limited to a max of 64"}
|
||||
|
||||
|
||||
55
vendor/github.com/nats-io/nats.go/jetstream/jetstream.go
generated
vendored
55
vendor/github.com/nats-io/nats.go/jetstream/jetstream.go
generated
vendored
@@ -34,9 +34,9 @@ type (
|
||||
//
|
||||
// - Publishing messages to a stream using [Publisher].
|
||||
// - Managing streams using [StreamManager].
|
||||
// - Managing consumers using [StreamConsumerManager]. Those are the same
|
||||
// methods as on [Stream], but are available as a shortcut to a consumer
|
||||
// bypassing stream lookup.
|
||||
// - Managing consumers using [StreamConsumerManager]. These are the same
|
||||
// methods available on [Stream], but exposed here as a shortcut that bypasses
|
||||
// stream lookup.
|
||||
// - Managing KeyValue stores using [KeyValueManager].
|
||||
// - Managing Object Stores using [ObjectStoreManager].
|
||||
//
|
||||
@@ -95,8 +95,8 @@ type (
|
||||
// be bound to a stream) and nats.Message.
|
||||
//
|
||||
// PublishMsgAsync does not guarantee that the message has been
|
||||
// sent to the server and thus messages can be stored in the stream
|
||||
// received by the server. It only guarantees that the message has been
|
||||
// sent to the server and thus messages can be stored in the stream
|
||||
// out of order in case of retries.
|
||||
PublishMsgAsync(msg *nats.Msg, opts ...PublishOpt) (PubAckFuture, error)
|
||||
|
||||
@@ -110,7 +110,7 @@ type (
|
||||
// server.
|
||||
PublishAsyncComplete() <-chan struct{}
|
||||
|
||||
// CleanupPublisher will cleanup the publishing side of JetStreamContext.
|
||||
// CleanupPublisher will clean up the publishing side of JetStreamContext.
|
||||
//
|
||||
// This will unsubscribe from the internal reply subject if needed.
|
||||
// All pending async publishes will fail with ErrJetStreamContextClosed.
|
||||
@@ -129,7 +129,7 @@ type (
|
||||
// CreateOrUpdateStream and Stream methods return a [Stream] interface, allowing
|
||||
// to operate on a stream.
|
||||
StreamManager interface {
|
||||
// CreateStream creates a new stream with given config and returns an
|
||||
// CreateStream creates a new stream with the given config and returns an
|
||||
// interface to operate on it. If stream with given name already exists
|
||||
// and its configuration differs from the provided one,
|
||||
// ErrStreamNameAlreadyInUse is returned.
|
||||
@@ -139,7 +139,7 @@ type (
|
||||
// ErrStreamNotFound is returned.
|
||||
UpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error)
|
||||
|
||||
// CreateOrUpdateStream creates a stream with given config. If stream
|
||||
// CreateOrUpdateStream creates a stream with the given config. If stream
|
||||
// already exists, it will be updated (if possible).
|
||||
CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error)
|
||||
|
||||
@@ -147,7 +147,7 @@ type (
|
||||
// If stream does not exist, ErrStreamNotFound is returned.
|
||||
Stream(ctx context.Context, stream string) (Stream, error)
|
||||
|
||||
// StreamNameBySubject returns a stream name stream listening on given
|
||||
// StreamNameBySubject returns the name of the stream that listens on the given
|
||||
// subject. If no stream is bound to given subject, ErrStreamNotFound
|
||||
// is returned.
|
||||
StreamNameBySubject(ctx context.Context, subject string) (string, error)
|
||||
@@ -156,11 +156,11 @@ type (
|
||||
// exist, ErrStreamNotFound is returned.
|
||||
DeleteStream(ctx context.Context, stream string) error
|
||||
|
||||
// ListStreams returns StreamInfoLister, enabling iterating over a
|
||||
// ListStreams returns StreamInfoLister, enabling iteration over a
|
||||
// channel of stream infos.
|
||||
ListStreams(context.Context, ...StreamListOpt) StreamInfoLister
|
||||
|
||||
// StreamNames returns a StreamNameLister, enabling iterating over a
|
||||
// StreamNames returns a StreamNameLister, enabling iteration over a
|
||||
// channel of stream names.
|
||||
StreamNames(context.Context, ...StreamListOpt) StreamNameLister
|
||||
}
|
||||
@@ -169,11 +169,11 @@ type (
|
||||
// available as a part of [JetStream] interface. This is an alternative to
|
||||
// [Stream] interface, allowing to bypass stream lookup. CreateConsumer,
|
||||
// UpdateConsumer, CreateOrUpdateConsumer and Consumer methods return a
|
||||
// [Consumer] interface, allowing to operate on a consumer (e.g. consume
|
||||
// [Consumer] interface, allowing operation on a consumer (e.g. consume
|
||||
// messages).
|
||||
StreamConsumerManager interface {
|
||||
// CreateOrUpdateConsumer creates a consumer on a given stream with
|
||||
// given config. If consumer already exists, it will be updated (if
|
||||
// the given config. If consumer already exists, it will be updated (if
|
||||
// possible). Consumer interface is returned, allowing to operate on a
|
||||
// consumer (e.g. fetch messages).
|
||||
CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
|
||||
@@ -191,10 +191,11 @@ type (
|
||||
// returned, allowing to operate on a consumer (e.g. fetch messages).
|
||||
UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
|
||||
|
||||
// OrderedConsumer returns an OrderedConsumer instance. OrderedConsumer
|
||||
// are managed by the library and provide a simple way to consume
|
||||
// messages from a stream. Ordered consumers are ephemeral in-memory
|
||||
// pull consumers and are resilient to deletes and restarts.
|
||||
// OrderedConsumer returns a client-managed ordered consumer for the given stream.
|
||||
// Ordered consumers use ephemeral pull consumers and automatically reset
|
||||
// themselves when ordering is lost. The client tracks state in memory and
|
||||
// recreates the underlying consumer as needed, making them resilient to deletes
|
||||
// and restarts while ensuring message order is preserved.
|
||||
OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error)
|
||||
|
||||
// Consumer returns an interface to an existing consumer, allowing processing
|
||||
@@ -213,7 +214,7 @@ type (
|
||||
ResumeConsumer(ctx context.Context, stream string, consumer string) (*ConsumerPauseResponse, error)
|
||||
|
||||
// CreateOrUpdatePushConsumer creates a push consumer on a given stream with
|
||||
// given config. If consumer already exists, it will be updated (if
|
||||
// the given config. If consumer already exists, it will be updated (if
|
||||
// possible). Consumer interface is returned, allowing to consume messages.
|
||||
CreateOrUpdatePushConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (PushConsumer, error)
|
||||
|
||||
@@ -234,7 +235,7 @@ type (
|
||||
// of messages. If consumer does not exist, ErrConsumerNotFound is
|
||||
// returned.
|
||||
//
|
||||
// It returns ErrNotPushConsumer if the consumer is not a push consumer (deliver subject is not set).
|
||||
// It returns ErrNotPushConsumer if the consumer is not a push consumer (delivery subject is not set).
|
||||
PushConsumer(ctx context.Context, stream string, consumer string) (PushConsumer, error)
|
||||
}
|
||||
|
||||
@@ -565,7 +566,7 @@ func (js *jetStream) Options() JetStreamOptions {
|
||||
return opts
|
||||
}
|
||||
|
||||
// CreateStream creates a new stream with given config and returns an
|
||||
// CreateStream creates a new stream with the given config and returns an
|
||||
// interface to operate on it. If stream with given name already exists,
|
||||
// ErrStreamNameAlreadyInUse is returned.
|
||||
func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream, error) {
|
||||
@@ -673,7 +674,7 @@ func convertStreamConfigDomains(cfg StreamConfig) (StreamConfig, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Check sources for the same.
|
||||
// Check sources for the same conversion.
|
||||
if len(ncfg.Sources) > 0 {
|
||||
ncfg.Sources = append([]*StreamSource(nil), ncfg.Sources...)
|
||||
for i, ss := range ncfg.Sources {
|
||||
@@ -754,7 +755,7 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CreateOrUpdateStream creates a stream with given config. If stream
|
||||
// CreateOrUpdateStream creates a stream with the given config. If stream
|
||||
// already exists, it will be updated (if possible).
|
||||
func (js *jetStream) CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error) {
|
||||
s, err := js.UpdateStream(ctx, cfg)
|
||||
@@ -823,7 +824,7 @@ func (js *jetStream) DeleteStream(ctx context.Context, name string) error {
|
||||
}
|
||||
|
||||
// CreateOrUpdateConsumer creates a consumer on a given stream with
|
||||
// given config. If consumer already exists, it will be updated (if
|
||||
// the given config. If consumer already exists, it will be updated (if
|
||||
// possible). Consumer interface is returned, allowing to operate on a
|
||||
// consumer (e.g. fetch messages).
|
||||
func (js *jetStream) CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
|
||||
@@ -905,7 +906,7 @@ func (js *jetStream) DeleteConsumer(ctx context.Context, stream string, name str
|
||||
}
|
||||
|
||||
// CreateOrUpdatePushConsumer creates a push consumer on a given stream with
|
||||
// given config. If consumer already exists, it will be updated (if
|
||||
// the given config. If consumer already exists, it will be updated (if
|
||||
// possible). Consumer interface is returned, allowing to consume messages.
|
||||
func (js *jetStream) CreateOrUpdatePushConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (PushConsumer, error) {
|
||||
if err := validateStreamName(stream); err != nil {
|
||||
@@ -1014,7 +1015,7 @@ func (js *jetStream) AccountInfo(ctx context.Context) (*AccountInfo, error) {
|
||||
return &resp.AccountInfo, nil
|
||||
}
|
||||
|
||||
// ListStreams returns StreamInfoLister, enabling iterating over a
|
||||
// ListStreams returns StreamInfoLister, enabling iteration over a
|
||||
// channel of stream infos.
|
||||
func (js *jetStream) ListStreams(ctx context.Context, opts ...StreamListOpt) StreamInfoLister {
|
||||
l := &streamLister{
|
||||
@@ -1068,7 +1069,7 @@ func (s *streamLister) Err() error {
|
||||
return s.err
|
||||
}
|
||||
|
||||
// StreamNames returns a StreamNameLister, enabling iterating over a
|
||||
// StreamNames returns a StreamNameLister, enabling iteration over a
|
||||
// channel of stream names.
|
||||
func (js *jetStream) StreamNames(ctx context.Context, opts ...StreamListOpt) StreamNameLister {
|
||||
l := &streamLister{
|
||||
@@ -1112,7 +1113,7 @@ func (js *jetStream) StreamNames(ctx context.Context, opts ...StreamListOpt) Str
|
||||
return l
|
||||
}
|
||||
|
||||
// StreamNameBySubject returns a stream name stream listening on given
|
||||
// StreamNameBySubject returns the name of the stream bound to the given
|
||||
// subject. If no stream is bound to given subject, ErrStreamNotFound
|
||||
// is returned.
|
||||
func (js *jetStream) StreamNameBySubject(ctx context.Context, subject string) (string, error) {
|
||||
@@ -1264,7 +1265,7 @@ func (js *jetStream) cleanupReplySub() {
|
||||
js.publisher.replySub = nil
|
||||
}
|
||||
if js.publisher.connStatusCh != nil {
|
||||
close(js.publisher.connStatusCh)
|
||||
js.conn.RemoveStatusListener(js.publisher.connStatusCh)
|
||||
js.publisher.connStatusCh = nil
|
||||
}
|
||||
js.publisher.Unlock()
|
||||
|
||||
4
vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go
generated
vendored
4
vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go
generated
vendored
@@ -250,7 +250,7 @@ type PullMaxBytes int
|
||||
|
||||
func (max PullMaxBytes) configureConsume(opts *consumeOpts) error {
|
||||
if max <= 0 {
|
||||
return fmt.Errorf("%w: max bytes must be greater then 0", ErrInvalidOption)
|
||||
return fmt.Errorf("%w: max bytes must be greater than 0", ErrInvalidOption)
|
||||
}
|
||||
opts.MaxBytes = int(max)
|
||||
return nil
|
||||
@@ -258,7 +258,7 @@ func (max PullMaxBytes) configureConsume(opts *consumeOpts) error {
|
||||
|
||||
func (max PullMaxBytes) configureMessages(opts *consumeOpts) error {
|
||||
if max <= 0 {
|
||||
return fmt.Errorf("%w: max bytes must be greater then 0", ErrInvalidOption)
|
||||
return fmt.Errorf("%w: max bytes must be greater than 0", ErrInvalidOption)
|
||||
}
|
||||
opts.MaxBytes = int(max)
|
||||
return nil
|
||||
|
||||
34
vendor/github.com/nats-io/nats.go/jetstream/kv.go
generated
vendored
34
vendor/github.com/nats-io/nats.go/jetstream/kv.go
generated
vendored
@@ -178,7 +178,7 @@ type (
|
||||
// the key value store in a streaming fashion (on a channel).
|
||||
ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error)
|
||||
|
||||
// ListKeysFiltered ListKeysWithFilters returns a KeyLister for filtered keys in the bucket.
|
||||
// ListKeysFiltered returns a KeyLister for filtered keys in the bucket.
|
||||
ListKeysFiltered(ctx context.Context, filters ...string) (KeyLister, error)
|
||||
|
||||
// History will return all historical values for the key (up to
|
||||
@@ -241,7 +241,7 @@ type (
|
||||
// subject after it's stored.
|
||||
RePublish *RePublish `json:"republish,omitempty"`
|
||||
|
||||
// Mirror defines the consiguration for mirroring another KeyValue
|
||||
// Mirror defines the configuration for mirroring another KeyValue
|
||||
// store.
|
||||
Mirror *StreamSource `json:"mirror,omitempty"`
|
||||
|
||||
@@ -328,6 +328,9 @@ type (
|
||||
|
||||
// Metadata returns the metadata associated with the bucket.
|
||||
Metadata() map[string]string
|
||||
|
||||
// Config returns the configuration for the bucket.
|
||||
Config() KeyValueConfig
|
||||
}
|
||||
|
||||
// KeyWatcher is what is returned when doing a watch. It can be used to
|
||||
@@ -831,6 +834,27 @@ func (s *KeyValueBucketStatus) LimitMarkerTTL() time.Duration {
|
||||
return s.info.Config.SubjectDeleteMarkerTTL
|
||||
}
|
||||
|
||||
// Config returns the configuration for the bucket.
|
||||
func (s *KeyValueBucketStatus) Config() KeyValueConfig {
|
||||
return KeyValueConfig{
|
||||
Bucket: s.bucket,
|
||||
Description: s.info.Config.Description,
|
||||
MaxValueSize: s.info.Config.MaxMsgSize,
|
||||
History: uint8(s.info.Config.MaxMsgsPerSubject),
|
||||
TTL: s.info.Config.MaxAge,
|
||||
MaxBytes: s.info.Config.MaxBytes,
|
||||
Storage: s.info.Config.Storage,
|
||||
Replicas: s.info.Config.Replicas,
|
||||
Placement: s.info.Config.Placement,
|
||||
RePublish: s.info.Config.RePublish,
|
||||
Mirror: s.info.Config.Mirror,
|
||||
Sources: s.info.Config.Sources,
|
||||
Compression: s.info.Config.Compression != NoCompression,
|
||||
Metadata: s.info.Config.Metadata,
|
||||
LimitMarkerTTL: s.info.Config.SubjectDeleteMarkerTTL,
|
||||
}
|
||||
}
|
||||
|
||||
// Metadata returns the metadata associated with the bucket.
|
||||
func (s *KeyValueBucketStatus) Metadata() map[string]string {
|
||||
return s.info.Config.Metadata
|
||||
@@ -1180,6 +1204,8 @@ func (w *watcher) Stop() error {
|
||||
return w.sub.Unsubscribe()
|
||||
}
|
||||
|
||||
// WatchFiltered will watch for any updates to keys that match the provided
|
||||
// key filters. It can be configured with the same options as Watch.
|
||||
func (kv *kvs) WatchFiltered(ctx context.Context, keys []string, opts ...WatchOpt) (KeyWatcher, error) {
|
||||
for _, key := range keys {
|
||||
if !searchKeyValid(key) {
|
||||
@@ -1364,7 +1390,7 @@ type keyLister struct {
|
||||
keys chan string
|
||||
}
|
||||
|
||||
// Keys will return all keys.
|
||||
// ListKeys will return all keys.
|
||||
func (kv *kvs) ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error) {
|
||||
opts = append(opts, IgnoreDeletes(), MetaOnly())
|
||||
watcher, err := kv.WatchAll(ctx, opts...)
|
||||
@@ -1391,7 +1417,7 @@ func (kv *kvs) ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error
|
||||
return kl, nil
|
||||
}
|
||||
|
||||
// ListKeysWithFilters returns a channel of keys matching the provided filters using WatchFiltered.
|
||||
// ListKeysFiltered returns a KeyLister for filtered keys in the bucket.
|
||||
func (kv *kvs) ListKeysFiltered(ctx context.Context, filters ...string) (KeyLister, error) {
|
||||
watcher, err := kv.WatchFiltered(ctx, filters, IgnoreDeletes(), MetaOnly())
|
||||
if err != nil {
|
||||
|
||||
10
vendor/github.com/nats-io/nats.go/jetstream/kv_options.go
generated
vendored
10
vendor/github.com/nats-io/nats.go/jetstream/kv_options.go
generated
vendored
@@ -29,7 +29,7 @@ func (opt watchOptFn) configureWatcher(opts *watchOpts) error {
|
||||
func IncludeHistory() WatchOpt {
|
||||
return watchOptFn(func(opts *watchOpts) error {
|
||||
if opts.updatesOnly {
|
||||
return fmt.Errorf("%w: include history can not be used with updates only", ErrInvalidOption)
|
||||
return fmt.Errorf("%w: include history cannot be used with updates only", ErrInvalidOption)
|
||||
}
|
||||
opts.includeHistory = true
|
||||
return nil
|
||||
@@ -41,14 +41,14 @@ func IncludeHistory() WatchOpt {
|
||||
func UpdatesOnly() WatchOpt {
|
||||
return watchOptFn(func(opts *watchOpts) error {
|
||||
if opts.includeHistory {
|
||||
return fmt.Errorf("%w: updates only can not be used with include history", ErrInvalidOption)
|
||||
return fmt.Errorf("%w: updates only cannot be used with include history", ErrInvalidOption)
|
||||
}
|
||||
opts.updatesOnly = true
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// IgnoreDeletes will have the key watcher not pass any deleted keys.
|
||||
// IgnoreDeletes will prevent the key watcher from passing any deleted keys.
|
||||
func IgnoreDeletes() WatchOpt {
|
||||
return watchOptFn(func(opts *watchOpts) error {
|
||||
opts.ignoreDeletes = true
|
||||
@@ -56,7 +56,7 @@ func IgnoreDeletes() WatchOpt {
|
||||
})
|
||||
}
|
||||
|
||||
// MetaOnly instructs the key watcher to retrieve only the entry meta data, not
|
||||
// MetaOnly instructs the key watcher to retrieve only the entry metadata, not
|
||||
// the entry value.
|
||||
func MetaOnly() WatchOpt {
|
||||
return watchOptFn(func(opts *watchOpts) error {
|
||||
@@ -121,7 +121,7 @@ func (opt createOptFn) configureCreate(opts *createOpts) error {
|
||||
}
|
||||
|
||||
// KeyTTL sets the TTL for the key. This is the time after which the key will be
|
||||
// automatically deleted. The TTL is set when the key is created and can not be
|
||||
// automatically deleted. The TTL is set when the key is created and cannot be
|
||||
// changed later. This requires LimitMarkerTTL to be enabled on the bucket.
|
||||
func KeyTTL(ttl time.Duration) KVCreateOpt {
|
||||
return createOptFn(func(opts *createOpts) error {
|
||||
|
||||
12
vendor/github.com/nats-io/nats.go/jetstream/message.go
generated
vendored
12
vendor/github.com/nats-io/nats.go/jetstream/message.go
generated
vendored
@@ -180,7 +180,7 @@ const (
|
||||
// level. Server will reject the message if it is not the case.
|
||||
//
|
||||
// This can be set when publishing messages using [WithExpectLastSequence]
|
||||
// option. option.
|
||||
// option.
|
||||
ExpectedLastSeqHeader = "Nats-Expected-Last-Sequence"
|
||||
|
||||
// ExpectedLastSubjSeqHeader contains the expected last sequence number on
|
||||
@@ -227,7 +227,7 @@ const (
|
||||
// SequenceHeader contains the original sequence number of the message.
|
||||
SequenceHeader = "Nats-Sequence"
|
||||
|
||||
// TimeStampHeader contains the original timestamp of the message.
|
||||
// TimeStampHeaer contains the original timestamp of the message.
|
||||
TimeStampHeaer = "Nats-Time-Stamp"
|
||||
|
||||
// SubjectHeader contains the original subject the message was published to.
|
||||
@@ -291,7 +291,7 @@ func (m *jetStreamMsg) Headers() nats.Header {
|
||||
return m.msg.Header
|
||||
}
|
||||
|
||||
// Subject returns a subject on which a message is published.
|
||||
// Subject returns a subject on which a message was published/received.
|
||||
func (m *jetStreamMsg) Subject() string {
|
||||
return m.msg.Subject
|
||||
}
|
||||
@@ -408,9 +408,9 @@ func (m *jetStreamMsg) checkReply() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns if the given message is a user message or not, and if
|
||||
// checkSts() is true, returns appropriate error based on the
|
||||
// content of the status (404, etc..)
|
||||
// checkMsg returns whether the given message is a user message or a control message.
|
||||
// If the status header is present, it returns an appropriate error based
|
||||
// on the status code (404, etc.)
|
||||
func checkMsg(msg *nats.Msg) (bool, error) {
|
||||
// If payload or no header, consider this a user message
|
||||
if len(msg.Data) > 0 || len(msg.Header) == 0 {
|
||||
|
||||
11
vendor/github.com/nats-io/nats.go/jetstream/object.go
generated
vendored
11
vendor/github.com/nats-io/nats.go/jetstream/object.go
generated
vendored
@@ -675,8 +675,17 @@ func (obs *obs) Put(ctx context.Context, meta ObjectMeta, r io.Reader) (*ObjectI
|
||||
return perr
|
||||
}
|
||||
|
||||
opts := []JetStreamOpt{
|
||||
WithPublishAsyncErrHandler(func(js JetStream, _ *nats.Msg, err error) { setErr(err) }),
|
||||
}
|
||||
|
||||
// if context deadline is not set, use default JetStream timeout (per publish)
|
||||
if _, ok := ctx.Deadline(); !ok {
|
||||
opts = append(opts, WithPublishAsyncTimeout(obs.js.opts.DefaultTimeout))
|
||||
}
|
||||
|
||||
// Create our own JS context to handle errors etc.
|
||||
pubJS, err := New(obs.js.conn, WithPublishAsyncErrHandler(func(js JetStream, _ *nats.Msg, err error) { setErr(err) }))
|
||||
pubJS, err := New(obs.js.conn, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
2
vendor/github.com/nats-io/nats.go/jetstream/stream.go
generated
vendored
2
vendor/github.com/nats-io/nats.go/jetstream/stream.go
generated
vendored
@@ -48,7 +48,7 @@ type (
|
||||
GetMsg(ctx context.Context, seq uint64, opts ...GetMsgOpt) (*RawStreamMsg, error)
|
||||
|
||||
// GetLastMsgForSubject retrieves the last raw stream message stored in
|
||||
// JetStream on a given subject subject.
|
||||
// JetStream on a given subject.
|
||||
GetLastMsgForSubject(ctx context.Context, subject string) (*RawStreamMsg, error)
|
||||
|
||||
// DeleteMsg deletes a message from a stream.
|
||||
|
||||
14
vendor/github.com/nats-io/nats.go/js.go
generated
vendored
14
vendor/github.com/nats-io/nats.go/js.go
generated
vendored
@@ -296,8 +296,7 @@ type jsOpts struct {
|
||||
}
|
||||
|
||||
const (
|
||||
defaultRequestWait = 5 * time.Second
|
||||
defaultAccountCheck = 20 * time.Second
|
||||
defaultRequestWait = 5 * time.Second
|
||||
)
|
||||
|
||||
// JetStream returns a JetStreamContext for messaging and stream management.
|
||||
@@ -2835,12 +2834,15 @@ func ConsumerFilterSubjects(subjects ...string) SubOpt {
|
||||
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
|
||||
sub.mu.Lock()
|
||||
// TODO(dlc) - Better way to mark especially if we attach.
|
||||
if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
|
||||
if sub.jsi.ordered {
|
||||
sub.mu.Unlock()
|
||||
if sub.jsi == nil {
|
||||
sub.mu.Unlock()
|
||||
return nil, ErrTypeSubscription
|
||||
} else if sub.jsi.consumer == _EMPTY_ {
|
||||
ordered := sub.jsi.ordered
|
||||
sub.mu.Unlock()
|
||||
if ordered {
|
||||
return nil, ErrConsumerInfoOnOrderedReset
|
||||
}
|
||||
sub.mu.Unlock()
|
||||
return nil, ErrTypeSubscription
|
||||
}
|
||||
|
||||
|
||||
21
vendor/github.com/nats-io/nats.go/kv.go
generated
vendored
21
vendor/github.com/nats-io/nats.go/kv.go
generated
vendored
@@ -105,6 +105,9 @@ type KeyValueStatus interface {
|
||||
|
||||
// IsCompressed indicates if the data is compressed on disk
|
||||
IsCompressed() bool
|
||||
|
||||
// Config returns the original configuration used to create the bucket
|
||||
Config() KeyValueConfig
|
||||
}
|
||||
|
||||
// KeyWatcher is what is returned when doing a watch.
|
||||
@@ -1208,6 +1211,24 @@ func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes }
|
||||
// IsCompressed indicates if the data is compressed on disk
|
||||
func (s *KeyValueBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression }
|
||||
|
||||
func (s *KeyValueBucketStatus) Config() KeyValueConfig {
|
||||
return KeyValueConfig{
|
||||
Bucket: s.bucket,
|
||||
Description: s.nfo.Config.Description,
|
||||
MaxValueSize: s.nfo.Config.MaxMsgSize,
|
||||
History: uint8(s.nfo.Config.MaxMsgsPerSubject),
|
||||
TTL: s.nfo.Config.MaxAge,
|
||||
MaxBytes: s.nfo.Config.MaxBytes,
|
||||
Storage: s.nfo.Config.Storage,
|
||||
Replicas: s.nfo.Config.Replicas,
|
||||
Placement: s.nfo.Config.Placement,
|
||||
RePublish: s.nfo.Config.RePublish,
|
||||
Mirror: s.nfo.Config.Mirror,
|
||||
Sources: s.nfo.Config.Sources,
|
||||
Compression: s.nfo.Config.Compression != NoCompression,
|
||||
}
|
||||
}
|
||||
|
||||
// Status retrieves the status and configuration of a bucket
|
||||
func (kv *kvs) Status() (KeyValueStatus, error) {
|
||||
nfo, err := kv.js.StreamInfo(kv.stream)
|
||||
|
||||
372
vendor/github.com/nats-io/nats.go/nats.go
generated
vendored
372
vendor/github.com/nats-io/nats.go/nats.go
generated
vendored
@@ -48,7 +48,7 @@ import (
|
||||
|
||||
// Default Constants
|
||||
const (
|
||||
Version = "1.48.0"
|
||||
Version = "1.49.0"
|
||||
DefaultURL = "nats://127.0.0.1:4222"
|
||||
DefaultPort = 4222
|
||||
DefaultMaxReconnect = 60
|
||||
@@ -152,6 +152,8 @@ var (
|
||||
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
|
||||
ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded")
|
||||
ErrWebSocketHeadersAlreadySet = errors.New("nats: websocket connection headers already set")
|
||||
ErrServerNotInPool = errors.New("nats: selected server is not in the pool")
|
||||
ErrMixingWebsocketSchemes = errors.New("nats: mixing of websocket and non websocket URLs is not allowed")
|
||||
)
|
||||
|
||||
// GetDefaultOptions returns default configuration options for the client.
|
||||
@@ -242,7 +244,7 @@ type SignatureHandler func([]byte) ([]byte, error)
|
||||
// AuthTokenHandler is used to generate a new token.
|
||||
type AuthTokenHandler func() string
|
||||
|
||||
// UserInfoCB is used to pass the username and password when establishing connection.
|
||||
// UserInfoCB is used to pass the username and password when establishing a connection.
|
||||
type UserInfoCB func() (string, string)
|
||||
|
||||
// ReconnectDelayHandler is used to get from the user the desired
|
||||
@@ -254,6 +256,25 @@ type ReconnectDelayHandler func(attempts int) time.Duration
|
||||
// WebSocketHeadersHandler is an optional callback handler for generating token used for WebSocket connections.
|
||||
type WebSocketHeadersHandler func() (http.Header, error)
|
||||
|
||||
// ReconnectToServerHandler is used to determine the server to reconnect to during
|
||||
// the reconnection process. The handler receives a snapshot of available servers
|
||||
// in the pool and should return a pointer to one of the servers from the provided
|
||||
// slice and a delay before attempting the connection.
|
||||
//
|
||||
// Return values:
|
||||
// - *Server: The server to connect to. Must be a pointer to an element from the
|
||||
// provided servers slice. If the returned server is not in the pool, the library
|
||||
// will fire ReconnectErrCB with ErrServerNotInPool and fall back to default
|
||||
// server selection.
|
||||
// - time.Duration: The delay before attempting the connection. If zero, the
|
||||
// connection attempt is made immediately. If non-zero, the library sleeps
|
||||
// for exactly that duration before attempting.
|
||||
//
|
||||
// MaxReconnect limits are enforced automatically: servers exceeding the configured
|
||||
// MaxReconnect attempts are removed from the pool before the handler is called.
|
||||
// To disable this limit, set MaxReconnect to a negative value.
|
||||
type ReconnectToServerHandler func([]Server, ServerInfo) (*Server, time.Duration)
|
||||
|
||||
// asyncCB is used to preserve order for async callbacks.
|
||||
type asyncCB struct {
|
||||
f func()
|
||||
@@ -350,7 +371,7 @@ type Options struct {
|
||||
// Defaults to 60.
|
||||
MaxReconnect int
|
||||
|
||||
// ReconnectWait sets the time to backoff after attempting a reconnect
|
||||
// ReconnectWait sets the time to back off after attempting a reconnect
|
||||
// to a server that we were already connected to previously.
|
||||
// Defaults to 2s.
|
||||
ReconnectWait time.Duration
|
||||
@@ -377,7 +398,7 @@ type Options struct {
|
||||
// Defaults to 2s.
|
||||
Timeout time.Duration
|
||||
|
||||
// DrainTimeout sets the timeout for a Drain Operation to complete.
|
||||
// DrainTimeout sets the timeout for a drain operation to complete.
|
||||
// Defaults to 30s.
|
||||
DrainTimeout time.Duration
|
||||
|
||||
@@ -431,18 +452,25 @@ type Options struct {
|
||||
AsyncErrorCB ErrHandler
|
||||
|
||||
// ReconnectErrCB sets the callback that is invoked whenever a
|
||||
// reconnect attempt failed
|
||||
// reconnect attempt fails.
|
||||
ReconnectErrCB ConnErrHandler
|
||||
|
||||
// ReconnectToServerCB is called before reconnection attempt.
|
||||
// It is used to determine the server to reconnect to out of
|
||||
// the list of available servers.
|
||||
// If a reconnect attempt is not successful, this callback will
|
||||
// be called again before the next attempt.
|
||||
ReconnectToServerCB ReconnectToServerHandler
|
||||
|
||||
// ReconnectBufSize is the size of the backing bufio during reconnect.
|
||||
// Once this has been exhausted publish operations will return an error.
|
||||
// Defaults to 8388608 bytes (8MB).
|
||||
ReconnectBufSize int
|
||||
|
||||
// SubChanLen is the size of the buffered channel used between the socket
|
||||
// Go routine and the message delivery for SyncSubscriptions.
|
||||
// goroutine and the message delivery for SyncSubscriptions.
|
||||
// NOTE: This does not affect AsyncSubscriptions which are
|
||||
// dictated by PendingLimits()
|
||||
// dictated by PendingLimits().
|
||||
// Defaults to 65536.
|
||||
SubChanLen int
|
||||
|
||||
@@ -470,7 +498,8 @@ type Options struct {
|
||||
// Token sets the token to be used when connecting to a server.
|
||||
Token string
|
||||
|
||||
// TokenHandler designates the function used to generate the token to be used when connecting to a server.
|
||||
// TokenHandler designates the function used to generate the token
|
||||
// used when connecting to a server.
|
||||
TokenHandler AuthTokenHandler
|
||||
|
||||
// Dialer allows a custom net.Dialer when forming connections.
|
||||
@@ -532,13 +561,17 @@ type Options struct {
|
||||
// WebSocketConnectionHeaders is an optional http request headers to be sent with the WebSocket request.
|
||||
WebSocketConnectionHeaders http.Header
|
||||
|
||||
// WebSocketConnectionHeadersHandler is an optional callback handler for generating token used for WebSocket connections.
|
||||
// WebSocketConnectionHeadersHandler is an optional callback handler for generating token used for WebSocket connections.
|
||||
WebSocketConnectionHeadersHandler WebSocketHeadersHandler
|
||||
|
||||
// SkipSubjectValidation will disable publish subject validation.
|
||||
// NOTE: This is not recommended in general, as the performance gain is minimal
|
||||
// and may lead to breaking protocol.
|
||||
SkipSubjectValidation bool
|
||||
|
||||
// IgnoreDiscoveredServers will disable adding advertised server URLs
|
||||
// from INFO messages to the server pool.
|
||||
IgnoreDiscoveredServers bool
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -577,14 +610,14 @@ type Conn struct {
|
||||
// Modifying the configuration of a running Conn is a race.
|
||||
Opts Options
|
||||
wg sync.WaitGroup
|
||||
srvPool []*srv
|
||||
current *srv
|
||||
srvPool []*Server
|
||||
current *Server
|
||||
urls map[string]struct{} // Keep track of all known URLs (used by processInfo)
|
||||
conn net.Conn
|
||||
bw *natsWriter
|
||||
br *natsReader
|
||||
fch chan struct{}
|
||||
info serverInfo
|
||||
info ServerInfo
|
||||
ssid int64
|
||||
subsMu sync.RWMutex
|
||||
subs map[int64]*Subscription
|
||||
@@ -815,18 +848,27 @@ type Statistics struct {
|
||||
Reconnects uint64
|
||||
}
|
||||
|
||||
// Tracks individual backend servers.
|
||||
type srv struct {
|
||||
url *url.URL
|
||||
// Server represents a server in the pool of servers that the client can connect to.
|
||||
type Server struct {
|
||||
URL *url.URL
|
||||
Reconnects int
|
||||
didConnect bool
|
||||
reconnects int
|
||||
lastErr error
|
||||
isImplicit bool
|
||||
tlsName string
|
||||
}
|
||||
|
||||
// The INFO block received from the server.
|
||||
type serverInfo struct {
|
||||
func (s Server) clone() Server {
|
||||
c := s
|
||||
if s.URL != nil {
|
||||
u := *s.URL
|
||||
c.URL = &u
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// ServerInfo represents the information about the server that is sent in the INFO protocol message.
|
||||
type ServerInfo struct {
|
||||
ID string `json:"server_id"`
|
||||
Name string `json:"server_name"`
|
||||
Proto int `json:"proto"`
|
||||
@@ -907,7 +949,7 @@ func Name(name string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// InProcessServer is an Option that will try to establish a direction to a NATS server
|
||||
// InProcessServer is an Option that will try to establish a connection to a NATS server
|
||||
// running within the process instead of dialing via TCP.
|
||||
func InProcessServer(server InProcessConnProvider) Option {
|
||||
return func(o *Options) error {
|
||||
@@ -1087,6 +1129,19 @@ func CustomReconnectDelay(cb ReconnectDelayHandler) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// ReconnectToServer is an Option to set a custom server selection callback
|
||||
// for reconnection attempts. The callback receives a snapshot of available
|
||||
// servers and must return a server from the pool to connect to and a delay
|
||||
// duration before attempting the connection.
|
||||
//
|
||||
// See ReconnectToServerHandler for detailed documentation and usage examples.
|
||||
func ReconnectToServer(cb ReconnectToServerHandler) Option {
|
||||
return func(o *Options) error {
|
||||
o.ReconnectToServerCB = cb
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// PingInterval is an Option to set the period for client ping commands.
|
||||
// Defaults to 2m.
|
||||
func PingInterval(t time.Duration) Option {
|
||||
@@ -1522,7 +1577,7 @@ func WebSocketConnectionHeadersHandler(cb WebSocketHeadersHandler) Option {
|
||||
// By default, subject validation is performed to ensure that subjects
|
||||
// are valid according to NATS subject syntax (no spaces newlines and tabs).
|
||||
// NOTE: It is not recommended to use this option as the performance gain
|
||||
// is minimal and disabling subject validation can lead breaking protocol
|
||||
// is minimal and disabling subject validation can lead to breaking protocol
|
||||
// rules.
|
||||
func SkipSubjectValidation() Option {
|
||||
return func(o *Options) error {
|
||||
@@ -1531,6 +1586,15 @@ func SkipSubjectValidation() Option {
|
||||
}
|
||||
}
|
||||
|
||||
// IgnoreDiscoveredServers is an Option to disable adding advertised
|
||||
// server URLs from INFO messages to the server pool.
|
||||
func IgnoreDiscoveredServers() Option {
|
||||
return func(o *Options) error {
|
||||
o.IgnoreDiscoveredServers = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Handler processing
|
||||
|
||||
// SetDisconnectHandler will set the disconnect event handler.
|
||||
@@ -1790,7 +1854,7 @@ const (
|
||||
)
|
||||
|
||||
// Return the currently selected server
|
||||
func (nc *Conn) currentServer() (int, *srv) {
|
||||
func (nc *Conn) currentServer() (int, *Server) {
|
||||
for i, s := range nc.srvPool {
|
||||
if s == nil {
|
||||
continue
|
||||
@@ -1804,7 +1868,7 @@ func (nc *Conn) currentServer() (int, *srv) {
|
||||
|
||||
// Pop the current server and put onto the end of the list. Select head of list as long
|
||||
// as number of reconnect attempts under MaxReconnect.
|
||||
func (nc *Conn) selectNextServer() (*srv, error) {
|
||||
func (nc *Conn) selectNextServer() (*Server, error) {
|
||||
i, s := nc.currentServer()
|
||||
if i < 0 {
|
||||
return nil, ErrNoServers
|
||||
@@ -1813,7 +1877,7 @@ func (nc *Conn) selectNextServer() (*srv, error) {
|
||||
num := len(sp)
|
||||
copy(sp[i:num-1], sp[i+1:num])
|
||||
maxReconnect := nc.Opts.MaxReconnect
|
||||
if maxReconnect < 0 || s.reconnects < maxReconnect {
|
||||
if maxReconnect < 0 || s.Reconnects < maxReconnect {
|
||||
nc.srvPool[num-1] = s
|
||||
} else {
|
||||
nc.srvPool = sp[0 : num-1]
|
||||
@@ -1849,7 +1913,7 @@ const tlsScheme = "tls"
|
||||
// Server Options. We will randomize the server pool unless
|
||||
// the NoRandomize flag is set.
|
||||
func (nc *Conn) setupServerPool() error {
|
||||
nc.srvPool = make([]*srv, 0, srvPoolSize)
|
||||
nc.srvPool = make([]*Server, 0, srvPoolSize)
|
||||
nc.urls = make(map[string]struct{}, srvPoolSize)
|
||||
|
||||
// Create srv objects from each url string in nc.Opts.Servers
|
||||
@@ -1886,7 +1950,7 @@ func (nc *Conn) setupServerPool() error {
|
||||
|
||||
// Check for Scheme hint to move to TLS mode.
|
||||
for _, srv := range nc.srvPool {
|
||||
if srv.url.Scheme == tlsScheme || srv.url.Scheme == wsSchemeTLS {
|
||||
if srv.URL.Scheme == tlsScheme || srv.URL.Scheme == wsSchemeTLS {
|
||||
// FIXME(dlc), this is for all in the pool, should be case by case.
|
||||
nc.Opts.Secure = true
|
||||
if nc.Opts.TLSConfig == nil {
|
||||
@@ -1917,8 +1981,10 @@ func hostIsIP(u *url.URL) bool {
|
||||
return net.ParseIP(u.Hostname()) != nil
|
||||
}
|
||||
|
||||
// addURLToPool adds an entry to the server pool
|
||||
func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
|
||||
// parseServerURL parses a server URL string into a Server struct.
|
||||
// It handles scheme defaults and port defaults. Does not validate websocket consistency.
|
||||
// Returns the parsed Server and whether it's a websocket URL.
|
||||
func (nc *Conn) parseServerURL(sURL string, implicit, saveTLSName bool) (*Server, error) {
|
||||
if !strings.Contains(sURL, "://") {
|
||||
sURL = fmt.Sprintf("%s://%s", nc.connScheme(), sURL)
|
||||
}
|
||||
@@ -1929,7 +1995,7 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
|
||||
for i := 0; i < 2; i++ {
|
||||
u, err = url.Parse(sURL)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
if u.Port() != "" {
|
||||
break
|
||||
@@ -1949,19 +2015,9 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
isWS := isWebsocketScheme(u)
|
||||
// We don't support mix and match of websocket and non websocket URLs.
|
||||
// If this is the first URL, then we accept and switch the global state
|
||||
// to websocket. After that, we will know how to reject mixed URLs.
|
||||
if len(nc.srvPool) == 0 {
|
||||
nc.ws = isWS
|
||||
} else if isWS && !nc.ws || !isWS && nc.ws {
|
||||
return errors.New("mixing of websocket and non websocket URLs is not allowed")
|
||||
}
|
||||
|
||||
var tlsName string
|
||||
if implicit {
|
||||
curl := nc.current.url
|
||||
curl := nc.current.URL
|
||||
// Check to see if we do not have a url.User but current connected
|
||||
// url does. If so copy over.
|
||||
if u.User == nil && curl.User != nil {
|
||||
@@ -1975,9 +2031,29 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
s := &srv{url: u, isImplicit: implicit, tlsName: tlsName}
|
||||
s := &Server{URL: u, isImplicit: implicit, tlsName: tlsName}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// addURLToPool adds an entry to the server pool
|
||||
func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
|
||||
s, err := nc.parseServerURL(sURL, implicit, saveTLSName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We don't support mix and match of websocket and non websocket URLs.
|
||||
// If this is the first URL, then we accept and switch the global state
|
||||
// to websocket. After that, we will know how to reject mixed URLs.
|
||||
isWS := isWebsocketScheme(s.URL)
|
||||
if len(nc.srvPool) == 0 {
|
||||
nc.ws = isWS
|
||||
} else if isWS != nc.ws {
|
||||
return ErrMixingWebsocketSchemes
|
||||
}
|
||||
|
||||
nc.srvPool = append(nc.srvPool, s)
|
||||
nc.urls[u.Host] = struct{}{}
|
||||
nc.urls[s.URL.Host] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2173,7 +2249,7 @@ func (nc *Conn) createConn() (err error) {
|
||||
|
||||
// We will auto-expand host names if they resolve to multiple IPs
|
||||
hosts := []string{}
|
||||
u := nc.current.url
|
||||
u := nc.current.URL
|
||||
|
||||
if !nc.Opts.SkipHostLookup && net.ParseIP(u.Hostname()) == nil {
|
||||
addrs, _ := net.LookupHost(u.Hostname())
|
||||
@@ -2259,7 +2335,7 @@ func (nc *Conn) makeTLSConn() error {
|
||||
if nc.current.tlsName != _EMPTY_ {
|
||||
tlsCopy.ServerName = nc.current.tlsName
|
||||
} else {
|
||||
h, _, _ := net.SplitHostPort(nc.current.url.Host)
|
||||
h, _, _ := net.SplitHostPort(nc.current.URL.Host)
|
||||
tlsCopy.ServerName = h
|
||||
}
|
||||
}
|
||||
@@ -2357,7 +2433,7 @@ func (nc *Conn) ConnectedUrl() string {
|
||||
if nc.status != CONNECTED {
|
||||
return _EMPTY_
|
||||
}
|
||||
return nc.current.url.String()
|
||||
return nc.current.URL.String()
|
||||
}
|
||||
|
||||
// ConnectedUrlRedacted reports the connected server's URL with passwords redacted
|
||||
@@ -2372,7 +2448,7 @@ func (nc *Conn) ConnectedUrlRedacted() string {
|
||||
if nc.status != CONNECTED {
|
||||
return _EMPTY_
|
||||
}
|
||||
return nc.current.url.Redacted()
|
||||
return nc.current.URL.Redacted()
|
||||
}
|
||||
|
||||
// ConnectedAddr returns the connected server's IP
|
||||
@@ -2587,7 +2663,7 @@ func (nc *Conn) connect() (bool, error) {
|
||||
|
||||
if err == nil {
|
||||
nc.current.didConnect = true
|
||||
nc.current.reconnects = 0
|
||||
nc.current.Reconnects = 0
|
||||
nc.current.lastErr = nil
|
||||
break
|
||||
} else {
|
||||
@@ -2704,7 +2780,7 @@ func (nc *Conn) sendProto(proto string) {
|
||||
func (nc *Conn) connectProto() (string, error) {
|
||||
o := nc.Opts
|
||||
var nkey, sig, user, pass, token, ujwt string
|
||||
u := nc.current.url.User
|
||||
u := nc.current.URL.User
|
||||
if u != nil {
|
||||
// if no password, assume username is authToken
|
||||
if _, ok := u.Password(); !ok {
|
||||
@@ -2999,17 +3075,88 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
|
||||
}
|
||||
|
||||
for i := 0; len(nc.srvPool) > 0; {
|
||||
cur, err := nc.selectNextServer()
|
||||
if err != nil {
|
||||
nc.err = err
|
||||
break
|
||||
var err error
|
||||
var cur *Server
|
||||
var callbackDelay time.Duration
|
||||
var useCallbackDelay bool
|
||||
|
||||
if nc.Opts.ReconnectToServerCB != nil {
|
||||
// Enforce MaxReconnect limits before calling the callback
|
||||
maxReconnect := nc.Opts.MaxReconnect
|
||||
if maxReconnect >= 0 {
|
||||
// Remove servers that have exceeded MaxReconnect attempts
|
||||
filtered := make([]*Server, 0, len(nc.srvPool))
|
||||
for _, srv := range nc.srvPool {
|
||||
if srv != nil && srv.Reconnects < maxReconnect {
|
||||
filtered = append(filtered, srv)
|
||||
}
|
||||
}
|
||||
nc.srvPool = filtered
|
||||
}
|
||||
|
||||
// Check if we still have servers after filtering
|
||||
if len(nc.srvPool) == 0 {
|
||||
nc.err = ErrNoServers
|
||||
break
|
||||
}
|
||||
|
||||
// Copy server values to avoid caller modifying internal state
|
||||
srvVals := make([]Server, len(nc.srvPool))
|
||||
for idx, srv := range nc.srvPool {
|
||||
if srv != nil {
|
||||
srvVals[idx] = srv.clone()
|
||||
}
|
||||
}
|
||||
|
||||
var selectedSrv *Server
|
||||
selectedSrv, callbackDelay = nc.Opts.ReconnectToServerCB(srvVals, nc.info)
|
||||
if selectedSrv != nil {
|
||||
idx := slices.IndexFunc(nc.srvPool, func(srv *Server) bool {
|
||||
return srv != nil && srv.URL.String() == selectedSrv.URL.String()
|
||||
})
|
||||
if idx != -1 {
|
||||
cur = nc.srvPool[idx]
|
||||
nc.current = cur
|
||||
useCallbackDelay = true
|
||||
} else if reconnectErrCB := nc.Opts.ReconnectErrCB; reconnectErrCB != nil {
|
||||
nc.ach.push(func() { reconnectErrCB(nc, ErrServerNotInPool) })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var doSleep bool
|
||||
if cur == nil {
|
||||
cur, err = nc.selectNextServer()
|
||||
if err != nil {
|
||||
nc.err = err
|
||||
break
|
||||
}
|
||||
doSleep = i+1 >= len(nc.srvPool) && !forceReconnect
|
||||
}
|
||||
|
||||
doSleep := i+1 >= len(nc.srvPool) && !forceReconnect
|
||||
forceReconnect = false
|
||||
nc.mu.Unlock()
|
||||
|
||||
if !doSleep {
|
||||
if useCallbackDelay {
|
||||
if callbackDelay > 0 {
|
||||
i = 0
|
||||
if rt == nil {
|
||||
rt = time.NewTimer(callbackDelay)
|
||||
} else {
|
||||
rt.Reset(callbackDelay)
|
||||
}
|
||||
select {
|
||||
case <-rqch:
|
||||
rt.Stop()
|
||||
nc.mu.Lock()
|
||||
nc.rqch = make(chan struct{})
|
||||
nc.mu.Unlock()
|
||||
case <-rt.C:
|
||||
}
|
||||
} else {
|
||||
runtime.Gosched()
|
||||
}
|
||||
} else if !doSleep {
|
||||
i++
|
||||
// Release the lock to give a chance to a concurrent nc.Close() to break the loop.
|
||||
runtime.Gosched()
|
||||
@@ -3055,7 +3202,7 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
|
||||
}
|
||||
|
||||
// Mark that we tried a reconnect
|
||||
cur.reconnects++
|
||||
cur.Reconnects++
|
||||
|
||||
// Try to create a new connection
|
||||
err = nc.createConn()
|
||||
@@ -3090,7 +3237,7 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
|
||||
|
||||
// Clear out server stats for the server we connected to..
|
||||
cur.didConnect = true
|
||||
cur.reconnects = 0
|
||||
cur.Reconnects = 0
|
||||
|
||||
// Send existing subscription state
|
||||
nc.resendSubscriptions()
|
||||
@@ -3758,7 +3905,7 @@ func (nc *Conn) processInfo(info string) error {
|
||||
if info == _EMPTY_ {
|
||||
return nil
|
||||
}
|
||||
var ncInfo serverInfo
|
||||
var ncInfo ServerInfo
|
||||
if err := json.Unmarshal([]byte(info), &ncInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -3769,7 +3916,7 @@ func (nc *Conn) processInfo(info string) error {
|
||||
// if advertise is disabled on that server, or servers that
|
||||
// did not include themselves in the async INFO protocol.
|
||||
// If empty, do not remove the implicit servers from the pool.
|
||||
if len(nc.info.ConnectURLs) == 0 {
|
||||
if len(nc.info.ConnectURLs) == 0 || nc.Opts.IgnoreDiscoveredServers {
|
||||
if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
|
||||
nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
|
||||
}
|
||||
@@ -3792,7 +3939,7 @@ func (nc *Conn) processInfo(info string) error {
|
||||
sp := nc.srvPool
|
||||
for i := 0; i < len(sp); i++ {
|
||||
srv := sp[i]
|
||||
curl := srv.url.Host
|
||||
curl := srv.URL.Host
|
||||
// Check if this URL is in the INFO protocol
|
||||
_, inInfo := tmp[curl]
|
||||
// Remove from the temp map so that at the end we are left with only
|
||||
@@ -3800,7 +3947,7 @@ func (nc *Conn) processInfo(info string) error {
|
||||
delete(tmp, curl)
|
||||
// Keep servers that were set through Options, but also the one that
|
||||
// we are currently connected to (even if it is a discovered server).
|
||||
if !srv.isImplicit || srv.url == nc.current.url {
|
||||
if !srv.isImplicit || srv.URL == nc.current.URL {
|
||||
continue
|
||||
}
|
||||
if !inInfo {
|
||||
@@ -3812,7 +3959,7 @@ func (nc *Conn) processInfo(info string) error {
|
||||
}
|
||||
}
|
||||
// Figure out if we should save off the current non-IP hostname if we encounter a bare IP.
|
||||
saveTLS := nc.current != nil && !hostIsIP(nc.current.url)
|
||||
saveTLS := nc.current != nil && !hostIsIP(nc.current.URL)
|
||||
|
||||
// If there are any left in the tmp map, these are new (or restarted) servers
|
||||
// and need to be added to the pool.
|
||||
@@ -4765,7 +4912,7 @@ func (s *Subscription) IsValid() bool {
|
||||
//
|
||||
// For a JetStream subscription, if the library has created the JetStream
|
||||
// consumer, the library will send a DeleteConsumer request to the server
|
||||
// when the Drain operation completes. If a failure occurs when deleting
|
||||
// when the drain operation completes. If a failure occurs when deleting
|
||||
// the JetStream consumer, an error will be reported to the asynchronous
|
||||
// error callback.
|
||||
// If you do not wish the JetStream consumer to be automatically deleted,
|
||||
@@ -5915,7 +6062,7 @@ func (nc *Conn) getServers(implicitOnly bool) []string {
|
||||
if implicitOnly && !nc.srvPool[i].isImplicit {
|
||||
continue
|
||||
}
|
||||
url := nc.srvPool[i].url
|
||||
url := nc.srvPool[i].URL
|
||||
servers = append(servers, fmt.Sprintf("%s://%s", url.Scheme, url.Host))
|
||||
}
|
||||
return servers
|
||||
@@ -6071,6 +6218,103 @@ func (nc *Conn) Barrier(f func()) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ServerPool returns a copy of the current server pool for the connection.
|
||||
//
|
||||
// This function should not be called from within connection callbacks to avoid
|
||||
// potential deadlocks.
|
||||
func (nc *Conn) ServerPool() []Server {
|
||||
nc.mu.RLock()
|
||||
defer nc.mu.RUnlock()
|
||||
servers := make([]Server, len(nc.srvPool))
|
||||
for i, srv := range nc.srvPool {
|
||||
if srv != nil {
|
||||
// Return a copy to avoid exposing internal state
|
||||
servers[i] = srv.clone()
|
||||
}
|
||||
}
|
||||
return servers
|
||||
}
|
||||
|
||||
// SetServerPool allows updating the server pool for the connection. This
|
||||
// replaces the existing pool with the provided list of server URLs. If the
|
||||
// current server is not in the new pool, the client will switch to a server in
|
||||
// the new pool on the next reconnect attempt. This function is thread-safe and
|
||||
// can be called while the connection is active. It will return an error if the
|
||||
// connection is closed or if any of the provided URLs are invalid.
|
||||
//
|
||||
// This function does not trigger an immediate reconnect. The new server
|
||||
// pool will be used on the next reconnect attempt.
|
||||
// If you want to trigger an immediate reconnect to apply the new server pool,
|
||||
// you can call [Conn.ForceReconnect] after this function.
|
||||
//
|
||||
// Unless [IgnoreDiscoveredServers] is used true, the client will continue to
|
||||
// discover and add new servers to the pool as it receives INFO messages from
|
||||
// the server. If you want to prevent this behavior and only use the servers
|
||||
// provided in SetServerPool, use [IgnoreDiscoveredServers].
|
||||
//
|
||||
// This function should not be called from within connection callbacks to avoid
|
||||
// potential deadlocks.
|
||||
func (nc *Conn) SetServerPool(servers []string) error {
|
||||
nc.mu.Lock()
|
||||
defer nc.mu.Unlock()
|
||||
if nc.isClosed() {
|
||||
return ErrConnectionClosed
|
||||
}
|
||||
|
||||
// Parse and validate all URLs first (without modifying state)
|
||||
newPool := make([]*Server, 0, len(servers))
|
||||
newURLs := make(map[string]struct{})
|
||||
|
||||
for _, addr := range servers {
|
||||
s, err := nc.parseServerURL(addr, false, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if isWebsocketScheme(s.URL) != nc.ws {
|
||||
return ErrMixingWebsocketSchemes
|
||||
}
|
||||
|
||||
newPool = append(newPool, s)
|
||||
newURLs[s.URL.Host] = struct{}{}
|
||||
}
|
||||
|
||||
// Preserve state from existing pool entries
|
||||
for _, newSrv := range newPool {
|
||||
if idx := slices.IndexFunc(nc.srvPool, func(oldSrv *Server) bool {
|
||||
return oldSrv != nil && oldSrv.URL.String() == newSrv.URL.String()
|
||||
}); idx != -1 {
|
||||
newSrv.Reconnects = nc.srvPool[idx].Reconnects
|
||||
newSrv.didConnect = nc.srvPool[idx].didConnect
|
||||
newSrv.lastErr = nc.srvPool[idx].lastErr
|
||||
}
|
||||
}
|
||||
|
||||
nc.srvPool = newPool
|
||||
nc.urls = newURLs
|
||||
|
||||
// Update nc.current to point to the corresponding server in the new pool
|
||||
// This is important because currentServer() uses pointer equality
|
||||
if nc.current != nil {
|
||||
currentURL := nc.current.URL.String()
|
||||
found := false
|
||||
for _, s := range newPool {
|
||||
if s.URL.String() == currentURL {
|
||||
// Update nc.current to point to the server instance in the new pool
|
||||
nc.current = s
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found && len(newPool) > 0 {
|
||||
// Current server not in new pool - point to first server in new pool
|
||||
// This ensures selectNextServer() can find it and properly rotate
|
||||
nc.current = newPool[0]
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetClientIP returns the client IP as known by the server.
|
||||
// Supported as of server version 2.1.6.
|
||||
func (nc *Conn) GetClientIP() (net.IP, error) {
|
||||
@@ -6133,8 +6377,8 @@ func (nc *Conn) RemoveStatusListener(ch chan (Status)) {
|
||||
}
|
||||
|
||||
for _, listeners := range nc.statListeners {
|
||||
for l := range listeners {
|
||||
delete(listeners, l)
|
||||
for range listeners {
|
||||
delete(listeners, ch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
4
vendor/modules.txt
vendored
4
vendor/modules.txt
vendored
@@ -1180,8 +1180,8 @@ github.com/nats-io/nats-server/v2/server/stree
|
||||
github.com/nats-io/nats-server/v2/server/sysmem
|
||||
github.com/nats-io/nats-server/v2/server/thw
|
||||
github.com/nats-io/nats-server/v2/server/tpm
|
||||
# github.com/nats-io/nats.go v1.48.0
|
||||
## explicit; go 1.23.0
|
||||
# github.com/nats-io/nats.go v1.49.0
|
||||
## explicit; go 1.24.0
|
||||
github.com/nats-io/nats.go
|
||||
github.com/nats-io/nats.go/encoders/builtin
|
||||
github.com/nats-io/nats.go/internal/parser
|
||||
|
||||
Reference in New Issue
Block a user