Compare commits

..

1 Commits

Author SHA1 Message Date
opencloudeu
49a018e973 [tx] updated from transifex 2026-02-24 00:12:39 +00:00
24 changed files with 236 additions and 1286 deletions

2
go.mod
View File

@@ -56,7 +56,7 @@ require (
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.12.4
github.com/nats-io/nats.go v1.49.0
github.com/nats-io/nats.go v1.48.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.1.3
github.com/onsi/ginkgo v1.16.5

4
go.sum
View File

@@ -916,8 +916,8 @@ github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.4 h1:ZnT10v2LU2Xcoiy8ek9X6Se4YG8EuMfIfvAEuFVx1Ts=
github.com/nats-io/nats-server/v2 v2.12.4/go.mod h1:5MCp/pqm5SEfsvVZ31ll1088ZTwEUdvRX1Hmh/mTTDg=
github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE=
github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

View File

@@ -4,16 +4,16 @@
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
# Translators:
# ii kaka, 2025
# iikaka88, 2025
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-02-03 00:13+0000\n"
"POT-Creation-Date: 2026-02-24 00:11+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: ii kaka, 2025\n"
"Last-Translator: iikaka88, 2025\n"
"Language-Team: Japanese (https://app.transifex.com/opencloud-eu/teams/204053/ja/)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"

View File

@@ -4,16 +4,16 @@
# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
#
# Translators:
# ii kaka, 2025
# iikaka88, 2025
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-02-03 00:13+0000\n"
"POT-Creation-Date: 2026-02-24 00:11+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: ii kaka, 2025\n"
"Last-Translator: iikaka88, 2025\n"
"Language-Team: Japanese (https://app.transifex.com/opencloud-eu/teams/204053/ja/)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"

View File

@@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
go get github.com/nats-io/nats.go@latest
# To get a specific version:
go get github.com/nats-io/nats.go@v1.49.0
go get github.com/nats-io/nats.go@v1.48.0
# Note that the latest major version for NATS Server is v2:
go get github.com/nats-io/nats-server/v2@latest
@@ -134,7 +134,7 @@ The simplest form is to use the helper method UserCredentials(credsFilepath).
nc, err := nats.Connect(url, nats.UserCredentials("user.creds"))
```
The helper method creates two callback handlers to present the user JWT and sign the nonce challenge from the server.
The helper methods creates two callback handlers to present the user JWT and sign the nonce challenge from the server.
The core client library never has direct access to your private key and simply performs the callback for signing the server challenge.
The helper will load and wipe and erase memory it uses for each connect or reconnect.
@@ -177,7 +177,7 @@ nc, err := nats.Connect("tls://nats.demo.io:4443")
// We provide a helper method to make this case easier.
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))
// If the server requires client certificate, there is a helper function for that too:
// If the server requires client certificate, there is an helper function for that too:
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)
@@ -210,17 +210,17 @@ if err != nil {
// "*" matches any token, at any level of the subject.
nc.Subscribe("foo.*.baz", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data))
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})
nc.Subscribe("foo.bar.*", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data))
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})
// ">" matches any length of the tail of a subject, and can only be the last token
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
nc.Subscribe("foo.>", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data))
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})
// Matches all of the above
@@ -237,7 +237,7 @@ nc.Publish("foo.bar.baz", []byte("Hello World"))
// Normal subscribers will continue to work as expected.
nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
received += 1
received += 1;
})
```
@@ -267,9 +267,9 @@ fmt.Println("All clear!")
// FlushTimeout specifies a timeout value as well.
err := nc.FlushTimeout(1*time.Second)
if err != nil {
fmt.Println("Flushed timed out!")
} else {
fmt.Println("All clear!")
} else {
fmt.Println("Flushed timed out!")
}
// Auto-unsubscribe after MAX_WANTED messages received
@@ -285,7 +285,7 @@ nc1.Subscribe("foo", func(m *Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
})
nc2.Publish("foo", []byte("Hello World!"))
nc2.Publish("foo", []byte("Hello World!"));
```
@@ -339,7 +339,7 @@ nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))
// For token based authentication:
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))
// You can even pass the two at the same time in case one of the servers
// You can even pass the two at the same time in case one of the server
// in the mesh requires token instead of user name and password.
nc, err = nats.Connect("nats://localhost:4222",
nats.UserInfo("foo", "bar"),
@@ -372,7 +372,7 @@ msg, err := sub.NextMsgWithContext(ctx)
```
## Backward compatibility
## Backwards compatibility
In the development of nats.go, we are committed to maintaining backward compatibility and ensuring a stable and reliable experience for all users. In general, we follow the standard go compatibility guidelines.
However, it's important to clarify our stance on certain types of changes:

View File

@@ -4,19 +4,19 @@ go 1.24.0
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.18.2
github.com/klauspost/compress v1.18.0
github.com/nats-io/jwt/v2 v2.8.0
github.com/nats-io/nats-server/v2 v2.12.3
github.com/nats-io/nkeys v0.4.12
github.com/nats-io/nats-server/v2 v2.12.0
github.com/nats-io/nkeys v0.4.11
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)
require (
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op // indirect
github.com/google/go-tpm v0.9.7 // indirect
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
golang.org/x/crypto v0.46.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/time v0.14.0 // indirect
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op // indirect
github.com/google/go-tpm v0.9.5 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/time v0.13.0 // indirect
)

View File

@@ -1,5 +1,5 @@
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op h1:Ucf+QxEKMbPogRO5guBNe5cgd9uZgfoJLOYs8WWhtjM=
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0=
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@@ -12,27 +12,27 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA=
github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU=
github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.3 h1:KRv+1n7lddMVgkJPQer+pt36TcO0ENxjilBmeWdjcHs=
github.com/nats-io/nats-server/v2 v2.12.3/go.mod h1:MQXjG9WjyXKz9koWzUc3jYUMKD8x3CLmTNy91IQQz3Y=
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg=
github.com/nats-io/nats-server/v2 v2.12.0 h1:OIwe8jZUqJFrh+hhiyKu8snNib66qsx806OslqJuo74=
github.com/nats-io/nats-server/v2 v2.12.0/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU=
golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@@ -1,749 +0,0 @@
# Migrating from Legacy JetStream API to `jetstream` Package
This guide helps you migrate from the legacy JetStream API in the `nats` package
(`nats.JetStreamContext`) to the new `jetstream` package
(`github.com/nats-io/nats.go/jetstream`).
- [Why Migrate?](#why-migrate)
- [Getting Started](#getting-started)
- [Stream Management](#stream-management)
- [Consumer Management](#consumer-management)
- [Publishing](#publishing)
- [Consuming Messages](#consuming-messages)
- [Replacing js.Subscribe()](#replacing-jssubscribe)
- [Replacing js.PullSubscribe()](#replacing-jspullsubscribe)
- [Ordered Consumers](#ordered-consumers)
- [Push Consumers](#push-consumers)
- [Subscription Options Mapping](#subscription-options-mapping)
- [Error Handling in Consume/Messages](#error-handling-in-consumemessages)
- [Message Acknowledgement](#message-acknowledgement)
- [KeyValue Store](#keyvalue-store)
- [Object Store](#object-store)
## Why Migrate?
The legacy JetStream API (`nats.JetStreamContext`) is deprecated. The `jetstream`
package provides a cleaner, more predictable API with several key improvements:
- **Explicit resource management.** Streams and consumers are created and managed
explicitly. The legacy `js.Subscribe()` implicitly created consumers behind
the scenes, leading to surprising behavior.
- **Pull consumers as the default.** Pull consumers with `Consume()` and
`Messages()` provide the same continuous message delivery as the legacy push-based
`Subscribe()`, but with better flow control and no slow consumer issues.
- **`context.Context` throughout.** All API calls accept `context.Context` for
timeout and cancellation, replacing the mix of `MaxWait`, `AckWait`, and
`Context()` options.
- **Clear interface separation.** Instead of one large `JetStreamContext` interface,
functionality is split across focused interfaces: `JetStream`, `Stream` and
`Consumer`.
## Getting Started
The core NATS connection remains unchanged. Only the JetStream initialization
differs:
```go
import (
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
nc, _ := nats.Connect(nats.DefaultURL)
```
**Legacy:**
```go
js, _ := nc.JetStream()
// With domain
js, _ := nc.JetStream(nats.Domain("hub"))
// With custom API prefix
js, _ := nc.JetStream(nats.APIPrefix("myprefix"))
```
**New:**
```go
js, _ := jetstream.New(nc)
// With domain
js, _ := jetstream.NewWithDomain(nc, "hub")
// With custom API prefix
js, _ := jetstream.NewWithAPIPrefix(nc, "myprefix")
```
### Initialization Options
| Legacy | New |
|-----------------------------------|--------------------------------------------|
| `nats.Domain(domain)` | `jetstream.NewWithDomain(nc, domain)` |
| `nats.APIPrefix(prefix)` | `jetstream.NewWithAPIPrefix(nc, prefix)` |
| `nats.PublishAsyncMaxPending(n)` | `jetstream.WithPublishAsyncMaxPending(n)` |
| `nats.PublishAsyncErrHandler(cb)` | `jetstream.WithPublishAsyncErrHandler(cb)` |
## Stream Management
`StreamConfig` is essentially the same struct — it just lives in the `jetstream`
package now. The new API takes `StreamConfig` by value (not pointer) and
management methods return a `Stream` handle instead of `*StreamInfo`.
| Legacy | New | Notes |
|---------------------------------|-------------------------------------|-------------------------------------------------------------------|
| `js.AddStream(cfg)` | `js.CreateStream(ctx, cfg)` | Also: `CreateOrUpdateStream()` |
| `js.UpdateStream(cfg)` | `js.UpdateStream(ctx, cfg)` | |
| `js.DeleteStream(name)` | `js.DeleteStream(ctx, name)` | |
| `js.StreamInfo(name)` | `s.Info(ctx)` / `s.CachedInfo()` | Get stream handle first via `js.Stream(ctx, name)` |
| `js.PurgeStream(name, opts...)` | `s.Purge(ctx, opts...)` | Options: `WithPurgeSubject`, `WithPurgeSequence`, `WithPurgeKeep` |
| `js.GetMsg(name, seq)` | `s.GetMsg(ctx, seq)` | |
| `js.GetLastMsg(name, subj)` | `s.GetLastMsgForSubject(ctx, subj)` | |
| `js.DeleteMsg(name, seq)` | `s.DeleteMsg(ctx, seq)` | Also: `s.SecureDeleteMsg()` |
| `js.Streams()` | `js.ListStreams(ctx)` | Returns lister with `.Info()` channel and `.Err()` |
| `js.StreamNames()` | `js.StreamNames(ctx)` | Returns lister with `.Name()` channel and `.Err()` |
The key architectural difference is that stream-specific operations (purge, get/delete
messages) now live on the `Stream` interface instead of the top-level context. Get
a stream handle first, then operate on it:
```go
s, _ := js.Stream(ctx, "ORDERS")
s.Purge(ctx)
msg, _ := s.GetMsg(ctx, 100)
```
## Consumer Management
The biggest conceptual change: in the legacy API, `js.Subscribe()` would
implicitly create consumers. In the new API, consumer creation is always explicit
and separate from message consumption.
| Legacy | New | Notes |
|------------------------------------------|----------------------------------------|----------------------------------------------------------------|
| `js.AddConsumer(stream, cfg)` | `js.CreateConsumer(ctx, stream, cfg)` | Also: `CreateOrUpdateConsumer()`, `UpdateConsumer()` |
| `js.Subscribe(subj, handler)` (implicit) | No equivalent | Must create consumer explicitly first |
| `js.ConsumerInfo(stream, name)` | `cons.Info(ctx)` / `cons.CachedInfo()` | Get consumer handle first via `js.Consumer(ctx, stream, name)` |
| `js.DeleteConsumer(stream, name)` | `js.DeleteConsumer(ctx, stream, name)` | |
| `js.Consumers(stream)` | `s.ListConsumers(ctx)` | Returns lister with `.Info()` channel and `.Err()` |
| `js.ConsumerNames(stream)` | `s.ConsumerNames(ctx)` | Returns lister with `.Name()` channel and `.Err()` |
Consumer management is available at two levels:
- On `JetStream` — requires stream name as parameter (e.g. `js.CreateConsumer(ctx, "ORDERS", cfg)`), bypassing the need to fetch a stream
- On `Stream` — no stream name needed (e.g. `s.CreateConsumer(ctx, cfg)`)
The new API provides three creation methods:
- `CreateConsumer` — fails if the consumer already exists with different config
- `UpdateConsumer` - fails if the consumer does not exist
- `CreateOrUpdateConsumer` — creates or updates as needed
**Additional notes on consumer behavior:**
- The default ack policy changed between the APIs. In the legacy API,
`AddConsumer()` defaulted to `AckNone`. In the new API, the default is
`AckExplicit`.
- In the legacy API, `sub.Unsubscribe()` on an implicitly created
consumer would automatically delete that consumer on the server. The new API
does not perform any automatic cleanup - consumers must be deleted explicitly
via `DeleteConsumer()`, or via `InactiveThreshold` on the consumer
config to let the server remove it automatically after a period of inactivity.
Push consumers use separate methods: `CreatePushConsumer`, `CreateOrUpdatePushConsumer`,
`UpdatePushConsumer`, and `PushConsumer` (for getting a handle).
```go
s, _ := js.Stream(ctx, "ORDERS")
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "processor",
})
```
## Publishing
Publishing is largely the same, with the addition of `context.Context` for
synchronous operations.
### Synchronous Publish
**Legacy:**
```go
ack, _ := js.Publish("ORDERS.new", []byte("hello"))
ack, _ = js.PublishMsg(&nats.Msg{
Subject: "ORDERS.new",
Data: []byte("hello"),
})
```
**New:**
```go
ack, _ := js.Publish(ctx, "ORDERS.new", []byte("hello"))
ack, _ = js.PublishMsg(ctx, &nats.Msg{
Subject: "ORDERS.new",
Data: []byte("hello"),
})
```
### Async Publish
**Legacy:**
```go
ackF, _ := js.PublishAsync("ORDERS.new", []byte("hello"))
select {
case ack := <-ackF.Ok():
fmt.Println(ack.Sequence)
case err := <-ackF.Err():
fmt.Println(err)
}
// Wait for all pending acks
<-js.PublishAsyncComplete()
```
**New:**
```go
// Async publish does not take context (returns immediately)
ackF, _ := js.PublishAsync("ORDERS.new", []byte("hello"))
select {
case ack := <-ackF.Ok():
fmt.Println(ack.Sequence)
case err := <-ackF.Err():
fmt.Println(err)
}
<-js.PublishAsyncComplete()
```
### Publish Options
| Legacy | New |
|------------------------------------------|---------------------------------------------------|
| `nats.MsgId(id)` | `jetstream.WithMsgID(id)` |
| `nats.ExpectStream(name)` | `jetstream.WithExpectStream(name)` |
| `nats.ExpectLastSequence(seq)` | `jetstream.WithExpectLastSequence(seq)` |
| `nats.ExpectLastSequencePerSubject(seq)` | `jetstream.WithExpectLastSequencePerSubject(seq)` |
| `nats.ExpectLastMsgId(id)` | `jetstream.WithExpectLastMsgID(id)` |
| `nats.RetryWait(dur)` | `jetstream.WithRetryWait(dur)` |
| `nats.RetryAttempts(n)` | `jetstream.WithRetryAttempts(n)` |
| `nats.StallWait(dur)` | `jetstream.WithStallWait(dur)` |
## Consuming Messages
This is the most significant area of change. The legacy API offered many
subscription flavors (`Subscribe`, `SubscribeSync`, `QueueSubscribe`,
`ChanSubscribe`, `PullSubscribe`) that blurred the line between consumer
creation, stream lookup and message consumption. The new API separates these
concerns: first create a consumer, then choose how to receive messages.
With the exception of PullSubscribe, all legacy subscription flavors utilized push consumers under the hood. The new API recommends pull consumers for all use cases, as they provide better flow control and no risk of slow consumer issues. Pull-based consumption is available via `Consume()` and `Messages()`, which maintain persistent pull subscriptions with pre-buffering for efficient continuous delivery. Push consumers are still supported for users who prefer that model, but pull consumers are the recommended default.
### Replacing `js.Subscribe()`
The legacy `js.Subscribe()` created a push consumer behind the scenes (unless
explicitly specified otherwise via `nats.Bind()` or `nats.Durable()`) and
delivered messages either via a callback. In the new API, the recommended
replacement is a **pull consumer** with `Consume()` or `Messages()`. These
provide the same continuous delivery with better flow control.
#### Legacy: callback subscription
```go
sub, _ := js.Subscribe("ORDERS.*", func(msg *nats.Msg) {
fmt.Printf("Received: %s\n", string(msg.Data))
msg.Ack()
}, nats.Durable("processor"), nats.ManualAck)
defer sub.Unsubscribe()
```
#### New: callback with `Consume()`
`Consume()` is the closest equivalent to `js.Subscribe()` — it delivers messages
to a callback function continuously.
```go
s, _ := js.Stream(ctx, "ORDERS")
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "processor",
FilterSubject: "ORDERS.*",
})
cc, _ := cons.Consume(func(msg jetstream.Msg) {
fmt.Printf("Received: %s\n", string(msg.Data()))
msg.Ack()
})
defer cc.Stop()
```
> Note: `ManualAck()` is not needed — messages are never auto-acknowledged in
> the new API.
#### New: iterator with `Messages()`
`Messages()` provides an iterator-based approach, useful when you want explicit
control over when the next message is fetched.
```go
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "processor",
FilterSubject: "ORDERS.*",
})
iter, _ := cons.Messages()
for {
msg, err := iter.Next()
if err != nil {
// handle error
}
fmt.Printf("Received: %s\n", string(msg.Data()))
msg.Ack()
}
// Call iter.Stop() when done
```
Both `Consume()` and `Messages()` maintain overlapping pull requests to the
server, providing efficient continuous delivery without gaps.
#### Legacy: synchronous subscription
```go
sub, _ := js.SubscribeSync("ORDERS.*", nats.Durable("processor"))
msg, _ := sub.NextMsg(time.Second)
```
**New:** Use `Messages()` and call `Next()`:
```go
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "processor",
FilterSubject: "ORDERS.*",
})
iter, _ := cons.Messages()
msg, _ := iter.Next()
```
#### Legacy: queue subscription
```go
// Multiple instances share work via a queue group
sub, _ := js.QueueSubscribe("ORDERS.*", "workers", handler,
nats.Durable("processor"))
```
**New with pull consumers:** With pull consumers, there is no need for an
explicit queue group. Multiple application instances (or goroutines) calling
`Consume()` or `Messages()` on the same durable consumer will naturally
distribute messages among themselves — the server tracks pending acknowledgements
and avoids delivering the same message to multiple consumers:
```go
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "processor",
})
cc, _ := cons.Consume(handler)
defer cc.Stop()
```
**New with push consumers:** If you need push-based queue semantics, set
`DeliverGroup` on a push consumer — this is the direct equivalent of the legacy
queue group:
```go
cons, _ := s.CreateOrUpdatePushConsumer(ctx, jetstream.ConsumerConfig{
Durable: "processor",
DeliverSubject: "deliver.orders",
DeliverGroup: "workers",
})
cc, _ := cons.Consume(handler)
defer cc.Stop()
```
> **Note:** Push consumers with `DeliverGroup` cannot be flow controlled. If you
> experience slow consumer issues, consider using pull-based consumers instead —
> multiple instances on the same durable consumer achieve the same work
> distribution without the slow consumer risk.
#### Legacy: channel subscription
```go
ch := make(chan *nats.Msg, 64)
sub, _ := js.ChanSubscribe("ORDERS.*", ch, nats.Durable("processor"))
for msg := range ch {
msg.Ack()
}
```
**New:** There is no direct channel-based equivalent. Use `Consume()` or
`Messages()` instead.
### Replacing `js.PullSubscribe()`
The legacy pull subscription required creating a subscription and then calling
`Fetch()` in a loop.
#### Legacy: pull subscribe + fetch loop
```go
sub, _ := js.PullSubscribe("ORDERS.*", "processor")
for {
msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
for _, msg := range msgs {
fmt.Printf("Received: %s\n", string(msg.Data))
msg.Ack()
}
}
```
**New with `Fetch()`/`FetchNoWait()` (one-off batch):**
If you specifically need one-off batch fetching, `Fetch()` is available directly
on the consumer — no separate subscription step:
```go
cons, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "processor",
FilterSubject: "ORDERS.*",
})
// non-blocking, returns a `FetchResult` that provides messages and error
msgs, _ := cons.Fetch(10, jetstream.FetchMaxWait(5*time.Second))
for msg := range msgs.Messages() {
fmt.Printf("Received: %s\n", string(msg.Data()))
msg.Ack()
}
if msgs.Error() != nil {
// handle error
}
```
> **Warning:** `Fetch()`, `FetchNoWait()`, and `FetchBytes()` are one-off,
> single pull requests. They do not perform pre-buffering optimizations. For
> continuous message processing, always prefer `Consume()` or `Messages()`.
> When using `FetchBytes()`, the requested byte size must stay under the
> client's max pending bytes limit (64MB by default), otherwise it will trigger
> slow consumer errors on the underlying subscription.
### Ordered Consumers
Ordered consumers provide strictly ordered, gap-free message delivery. The library
automatically recreates the underlying consumer on sequence gaps or heartbeat
failures.
**Legacy:**
```go
sub, _ := js.Subscribe("ORDERS.*", handler, nats.OrderedConsumer())
```
**New:**
```go
cons, _ := js.OrderedConsumer(ctx, "ORDERS", jetstream.OrderedConsumerConfig{
FilterSubjects: []string{"ORDERS.*"},
})
// Use the same consumption methods as regular consumers
cc, _ := cons.Consume(func(msg jetstream.Msg) {
fmt.Printf("Received: %s\n", string(msg.Data()))
})
defer cc.Stop()
```
### Push Consumers
Pull consumers are recommended for most use cases, but push consumers are also
supported. Push consumers require `DeliverSubject` in their config and only
support `Consume()` (not `Fetch()` or `Messages()`).
**Legacy:**
```go
sub, _ := js.Subscribe("ORDERS.*", handler,
nats.Durable("processor"),
nats.DeliverSubject("deliver.orders"),
nats.IdleHeartbeat(30*time.Second),
)
```
**New:**
```go
cons, _ := s.CreateOrUpdatePushConsumer(ctx, jetstream.ConsumerConfig{
Durable: "processor",
FilterSubject: "ORDERS.*",
DeliverSubject: "deliver.orders",
IdleHeartbeat: 30 * time.Second,
})
cc, _ := cons.Consume(func(msg jetstream.Msg) {
fmt.Printf("Received: %s\n", string(msg.Data()))
msg.Ack()
})
defer cc.Stop()
```
### Subscription Options Mapping
Most legacy `SubOpt` options map directly to `ConsumerConfig` fields. Since
consumer creation is explicit, these are set at creation time rather than passed
as subscription options.
| Legacy SubOpt | New ConsumerConfig field |
|-------------------------------------|---------------------------------------------------------------------------|
| `nats.Durable("name")` | `Durable: "name"` |
| `nats.ConsumerName("name")` | `Name: "name"` |
| `nats.Description("desc")` | `Description: "desc"` |
| `nats.DeliverAll()` | `DeliverPolicy: jetstream.DeliverAllPolicy` |
| `nats.DeliverLast()` | `DeliverPolicy: jetstream.DeliverLastPolicy` |
| `nats.DeliverLastPerSubject()` | `DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy` |
| `nats.DeliverNew()` | `DeliverPolicy: jetstream.DeliverNewPolicy` |
| `nats.StartSequence(seq)` | `DeliverPolicy: jetstream.DeliverByStartSequencePolicy, OptStartSeq: seq` |
| `nats.StartTime(t)` | `DeliverPolicy: jetstream.DeliverByStartTimePolicy, OptStartTime: &t` |
| `nats.AckExplicit()` | `AckPolicy: jetstream.AckExplicitPolicy` |
| `nats.AckAll()` | `AckPolicy: jetstream.AckAllPolicy` |
| `nats.AckNone()` | `AckPolicy: jetstream.AckNonePolicy` |
| `nats.ManualAck()` | Not needed (messages are never auto-acked) |
| `nats.MaxDeliver(n)` | `MaxDeliver: n` |
| `nats.MaxAckPending(n)` | `MaxAckPending: n` |
| `nats.BackOff(durations)` | `BackOff: durations` |
| `nats.ReplayOriginal()` | `ReplayPolicy: jetstream.ReplayOriginalPolicy` |
| `nats.ReplayInstant()` | `ReplayPolicy: jetstream.ReplayInstantPolicy` |
| `nats.RateLimit(bps)` | `RateLimit: bps` |
| `nats.HeadersOnly()` | `HeadersOnly: true` |
| `nats.InactiveThreshold(dur)` | `InactiveThreshold: dur` |
| `nats.ConsumerFilterSubjects(s...)` | `FilterSubjects: s` |
| `nats.ConsumerReplicas(n)` | `Replicas: n` |
| `nats.ConsumerMemoryStorage()` | `MemoryStorage: true` |
The following options have no direct equivalent — use the consumer handle
directly instead:
| Legacy SubOpt | New equivalent |
|-------------------------------|---------------------------------------------------------------------|
| `nats.Bind(stream, consumer)` | `js.Consumer(ctx, stream, consumer)` or `s.Consumer(ctx, consumer)` |
| `nats.BindStream(stream)` | Use `js.Stream(ctx, stream)` to get a stream handle |
| `nats.OrderedConsumer()` | `js.OrderedConsumer(ctx, stream, cfg)` |
### Consume/Messages Options
`Consume()` and `Messages()` accept options that control pull request behavior:
| Option | Description |
|----------------------------|--------------------------------------------------------------|
| `PullMaxMessages(n)` | Max messages buffered (default: 500) |
| `PullMaxBytes(n)` | Max bytes buffered (mutually exclusive with PullMaxMessages) |
| `PullExpiry(dur)` | Pull request timeout (default: 30s) |
| `PullHeartbeat(dur)` | Idle heartbeat interval |
| `PullThresholdMessages(n)` | Refill threshold (default: 50% of max) |
| `PullThresholdBytes(n)` | Byte-based refill threshold |
| `StopAfter(n)` | Auto-stop after N messages |
| `ConsumeErrHandler(fn)` | Custom error handler |
### Error Handling in Consume/Messages
Both `Consume()` and `Messages()` handle server-sent status messages internally.
Some errors are terminal (stop consumption), while others are recoverable
(consumption continues).
**Terminal errors** — consumption stops automatically:
- `ErrConsumerDeleted` — the consumer was deleted on the server
- `ErrBadRequest` — invalid request (e.g. misconfigured consumer)
- Connection closed — for `Consume()` this surfaces as `ErrConnectionClosed`;
for `Messages()`, `Next()` returns `ErrMsgIteratorClosed`
**Recoverable errors** — reported via error handler, consumption continues:
- `ErrNoHeartbeat` — missed idle heartbeats from server; a new pull request
is issued automatically
- `ErrConsumerLeadershipChanged` — consumer moved to a different server in the
cluster; pending counts are reset
- `nats.ErrNoResponders` — no JetStream service available (temporary)
#### Error handling with `Consume()`
Use `ConsumeErrHandler` to be notified about both terminal and recoverable errors:
```go
cc, _ := cons.Consume(func(msg jetstream.Msg) {
msg.Ack()
}, jetstream.ConsumeErrHandler(func(cc jetstream.ConsumeContext, err error) {
if errors.Is(err, jetstream.ErrConsumerDeleted) ||
errors.Is(err, jetstream.ErrBadRequest) {
log.Fatalf("terminal consumer error: %v", err)
}
log.Printf("recoverable consumer error: %v", err)
}))
defer cc.Stop()
```
#### Error handling with `Messages()`
With `Messages()`, terminal errors are returned directly by `Next()`. By default,
`ErrNoHeartbeat` is also returned by `Next()` (controlled by
`WithMessagesErrOnMissingHeartbeat`), but it is not terminal — you can continue
calling `Next()`:
```go
iter, _ := cons.Messages()
for {
msg, err := iter.Next()
if err != nil {
if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
// iterator was stopped (either explicitly or due to connection close)
break
}
if errors.Is(err, jetstream.ErrNoHeartbeat) {
// recoverable — new pull request is issued, keep going
log.Println("missed heartbeat, re-pulling")
continue
}
// ErrConsumerDeleted, ErrBadRequest are terminal
log.Fatalf("terminal error: %v", err)
}
msg.Ack()
}
```
## Message Acknowledgement
Ack methods are similar, with minor naming changes. The main difference is that
message fields are accessed via methods instead of struct fields.
| Legacy | New |
|-------------------------|------------------------------|
| `msg.Ack()` | Unchanged |
| `msg.AckSync()` | `msg.DoubleAck(ctx)` |
| `msg.Nak()` | Unchanged |
| `msg.NakWithDelay(dur)` | Unchanged |
| `msg.InProgress()` | Unchanged |
| `msg.Term()` | Unchanged |
| N/A | `msg.TermWithReason(reason)` |
| `msg.Metadata()` | Unchanged |
### Accessing Message Data
**Legacy:** Direct struct fields on `*nats.Msg`:
```go
fmt.Println(string(msg.Data))
fmt.Println(msg.Subject)
fmt.Println(msg.Header.Get("key"))
```
**New:** Methods on `jetstream.Msg` interface:
```go
fmt.Println(string(msg.Data()))
fmt.Println(msg.Subject())
fmt.Println(msg.Headers().Get("key"))
```
## KeyValue Store
The KV API is nearly identical. The main changes are:
1. All methods take `context.Context` as the first parameter
2. New `CreateOrUpdateKeyValue()` and `UpdateKeyValue()` methods
3. Types live in the `jetstream` package
**Legacy:**
```go
js, _ := nc.JetStream()
kv, _ := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "profiles",
})
kv.Put("sue.color", []byte("blue"))
entry, _ := kv.Get("sue.color")
fmt.Println(string(entry.Value()))
watcher, _ := kv.Watch("sue.*")
defer watcher.Stop()
```
**New:**
```go
js, _ := jetstream.New(nc)
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "profiles",
})
kv.Put(ctx, "sue.color", []byte("blue"))
entry, _ := kv.Get(ctx, "sue.color")
fmt.Println(string(entry.Value()))
watcher, _ := kv.Watch(ctx, "sue.*")
defer watcher.Stop()
```
### KV Management Methods
| Legacy | New |
|-----------------------------|---------------------------------------|
| `js.KeyValue(bucket)` | `js.KeyValue(ctx, bucket)` |
| `js.CreateKeyValue(cfg)` | `js.CreateKeyValue(ctx, cfg)` |
| N/A | `js.UpdateKeyValue(ctx, cfg)` |
| N/A | `js.CreateOrUpdateKeyValue(ctx, cfg)` |
| `js.DeleteKeyValue(bucket)` | `js.DeleteKeyValue(ctx, bucket)` |
| `js.KeyValueStoreNames()` | `js.KeyValueStoreNames(ctx)` |
| `js.KeyValueStores()` | `js.KeyValueStores(ctx)` |
## Object Store
Same pattern as KV — all methods gain `context.Context`, types move to `jetstream`
package.
**Legacy:**
```go
js, _ := nc.JetStream()
os, _ := js.CreateObjectStore(&nats.ObjectStoreConfig{
Bucket: "configs",
})
os.PutString("config-1", "data")
result, _ := os.Get("config-1")
data, _ := io.ReadAll(result)
```
**New:**
```go
js, _ := jetstream.New(nc)
os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{
Bucket: "configs",
})
os.PutString(ctx, "config-1", "data")
result, _ := os.Get(ctx, "config-1")
data, _ := io.ReadAll(result)
```
### Object Store Management Methods
| Legacy | New |
|--------------------------------|------------------------------------------|
| `js.ObjectStore(bucket)` | `js.ObjectStore(ctx, bucket)` |
| `js.CreateObjectStore(cfg)` | `js.CreateObjectStore(ctx, cfg)` |
| N/A | `js.UpdateObjectStore(ctx, cfg)` |
| N/A | `js.CreateOrUpdateObjectStore(ctx, cfg)` |
| `js.DeleteObjectStore(bucket)` | `js.DeleteObjectStore(ctx, bucket)` |

View File

@@ -121,7 +121,7 @@ func main() {
messageCounter++
}
fmt.Printf("Received %d messages\n", messageCounter)
fmt.Printf("received %d messages\n", messageCounter)
if msgs.Error() != nil {
fmt.Println("Error during Fetch(): ", msgs.Error())
@@ -224,7 +224,7 @@ _ = s.Purge(ctx, jetstream.WithPurgeSequence(100))
_ = s.Purge(ctx, jetstream.WithPurgeKeep(10))
```
- Get and delete messages from a stream
- Get and messages from stream
```go
// get message from stream with sequence number == 100
@@ -240,7 +240,7 @@ _ = s.DeleteMsg(ctx, 100)
- Get information about a stream
```go
// Fetches the latest stream info from server
// Fetches latest stream info from server
info, _ := s.Info(ctx)
fmt.Println(info.Config.Name)
@@ -310,7 +310,7 @@ cons2 := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
// or an illegal property is to be updated (e.g. AckPolicy)
updated, _ := js.UpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
Description: "updated consumer",
Description: "updated consumer"
})
// get consumer handle
@@ -336,7 +336,7 @@ cons, _ := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
})
// get consumer handle
cons, _ = stream.Consumer(ctx, "foo")
cons, _ = stream.Consumer(ctx, "ORDERS", "foo")
// delete a consumer
stream.DeleteConsumer(ctx, "foo")
@@ -395,19 +395,20 @@ js, _ := jetstream.New(nc)
// create a consumer (this is an idempotent operation)
cons, _ := js.OrderedConsumer(ctx, "ORDERS", jetstream.OrderedConsumerConfig{
// Filter results from "ORDERS" stream by specific subject
FilterSubjects: []string{"ORDERS.A"},
FilterSubjects: []{"ORDERS.A"},
})
```
### Receiving messages from pull consumers
The `Consumer` interface allows fetching messages on demand, with a
pre-defined batch size or byte limit, or continuous push-like receiving of
The `Consumer` interface covers allows fetching messages on demand, with
pre-defined batch size on bytes limit, or continuous push-like receiving of
messages.
#### __Single fetch__
This pattern allows fetching a defined number of messages in a single RPC.
This pattern pattern allows fetching a defined number of messages in a single
RPC.
- Using `Fetch` or `FetchBytes`, consumer will return up to the provided number
of messages/bytes. By default, `Fetch()` will wait 30 seconds before timing out
@@ -480,10 +481,10 @@ single messages on demand.
Subject filtering is achieved by configuring a consumer with a `FilterSubject`
value.
##### Using `Consume()` to receive messages in a callback
##### Using `Consume()` receive messages in a callback
```go
cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
// receive messages from ORDERS.A subject only
FilterSubject: "ORDERS.A"
@@ -497,7 +498,7 @@ consContext, _ := c.Consume(func(msg jetstream.Msg) {
defer consContext.Stop()
```
Similar to `Messages()`, `Consume()` can be supplied with options to modify
Similarly to `Messages()`, `Consume()` can be supplied with options to modify
the behavior of a single pull request:
- `PullMaxMessages(int)` - up to provided number of messages will be buffered
@@ -510,6 +511,7 @@ the behavior of a single pull request:
request. If the value is set too low, the consumer will stall and not be able
to consume messages.
- `PullExpiry(time.Duration)` - timeout on a single pull request to the server
type PullThresholdMessages int
- `PullThresholdMessages(int)` - amount of messages which triggers refilling the
buffer
- `PullThresholdBytes(int)` - amount of bytes which triggers refilling the
@@ -519,10 +521,10 @@ request. An error will be triggered if at least 2 heartbeats are missed
- `ConsumeErrHandler(func (ConsumeContext, error))` - when used, sets a
custom error handler on `Consume()`, allowing e.g. tracking missing
heartbeats.
- `PullMaxMessagesWithBytesLimit(int, int)` - up to the provided number of messages
will be buffered and a single fetch size will be limited to the provided value.
- `PullMaxMessagesWithBytesLimit` - up to the provided number of messages will
be buffered and a single fetch size will be limited to the provided value.
This is an advanced option and should be used with caution. Most of the time,
`PullMaxMessages` or `PullMaxBytes` should be used instead. Note that the byte
`PullMaxMessages` or `PullMaxBytes` should be used instead. Note that he byte
limit should never be set to a value lower than the maximum message size that
can be expected from the server. If the byte limit is lower than the maximum
message size, the consumer will stall and not be able to consume messages.
@@ -573,10 +575,10 @@ iter, _ := cons.Messages(jetstream.PullMaxMessages(10), jetstream.PullMaxBytes(1
- `PullHeartbeat(time.Duration)` - idle heartbeat duration for a single pull
request. An error will be triggered if at least 2 heartbeats are missed (unless
`WithMessagesErrOnMissingHeartbeat(false)` is used)
- `PullMaxMessagesWithBytesLimit(int, int)` - up to the provided number of messages
will be buffered and a single fetch size will be limited to the provided value.
- `PullMaxMessagesWithBytesLimit` - up to the provided number of messages will
be buffered and a single fetch size will be limited to the provided value.
This is an advanced option and should be used with caution. Most of the time,
`PullMaxMessages` or `PullMaxBytes` should be used instead. Note that the byte
`PullMaxMessages` or `PullMaxBytes` should be used instead. Note that he byte
limit should never be set to a value lower than the maximum message size that
can be expected from the server. If the byte limit is lower than the maximum
message size, the consumer will stall and not be able to consume messages.
@@ -620,7 +622,7 @@ can be set to prevent the consumer from receiving more messages than it can
handle.
```go
cons, _ := js.CreateOrUpdatePushConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
cons, _ := js.CreateOrUpdatePushConsumer("ORDERS", jetstream.ConsumerConfig{
DeliverSubject: nats.NewInbox()
AckPolicy: jetstream.AckExplicitPolicy,
// receive messages from ORDERS.A subject only
@@ -669,7 +671,7 @@ setting various headers. Additionally, for `PublishMsg()` headers can be set
directly on `nats.Msg`.
```go
// All 3 implementations work identically
// All 3 implementations are work identically
ack, err := js.PublishMsg(ctx, &nats.Msg{
Data: []byte("hello"),
Subject: "ORDERS.new",
@@ -970,14 +972,14 @@ js.DeleteObjectStore(ctx, "configs")
Object Stores support Watchers, which can be used to watch for changes on
objects in a given bucket. Watcher will receive a notification on a channel when
a change occurs. By default, watcher will return the latest information for all
a change occurs. By default, watcher will return latest information for all
objects in a bucket. After sending all initial values, watcher will send nil on
the channel to signal that all initial values have been sent and it will start
sending updates when changes occur.
>__NOTE:__ Watchers do not retrieve values for objects, only metadata (containing
>information such as object name, bucket name, object size etc.). If object data
>is required, the `Get` method should be used.
>is required, `Get` method should be used.
Watcher supports several configuration options:

View File

@@ -50,11 +50,11 @@ const (
// apiConsumerCreateT is used to create consumers.
apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"
// apiConsumerCreateWithFilterSubjectT is used to create consumers with a filter subject.
// apiConsumerCreateT is used to create consumers.
// it accepts stream name, consumer name and filter subject
apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"
// apiConsumerInfoT is used to retrieve consumer information.
// apiConsumerInfoT is used to create consumers.
apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
// apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
@@ -96,7 +96,7 @@ const (
// apiMsgGetT is the endpoint to get a message.
apiMsgGetT = "STREAM.MSG.GET.%s"
// apiDirectMsgGetT is the endpoint to perform a direct get of a message.
// apiMsgGetT is the endpoint to perform a direct get of a message.
apiDirectMsgGetT = "DIRECT.GET.%s"
// apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.

View File

@@ -44,13 +44,13 @@ type (
// deletes and restarts. They provide limited configuration options
// using [OrderedConsumerConfig].
//
// Consumer provides methods for optimized continuous consumption of messages
// Consumer provides method for optimized continuous consumption of messages
// using Consume and Messages methods, as well as simple one-off messages
// retrieval using Fetch and Next methods.
Consumer interface {
// Fetch is used to retrieve up to a provided number of messages from a
// stream. This method will send a single request and deliver either all
// requested messages unless the timeout is met earlier. Fetch timeout
// requested messages unless time out is met earlier. Fetch timeout
// defaults to 30 seconds and can be configured using FetchMaxWait
// option.
//
@@ -74,9 +74,9 @@ type (
// subscription is created for each execution.
Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
// FetchBytes is used to retrieve up to a provided number of bytes from
// the stream. This method will send a single request and deliver the
// provided number of bytes unless the timeout is met earlier. FetchBytes
// FetchBytes is used to retrieve up to a provided bytes from the
// stream. This method will send a single request and deliver the
// provided number of bytes unless time out is met earlier. FetchBytes
// timeout defaults to 30 seconds and can be configured using
// FetchMaxWait option.
//
@@ -84,7 +84,7 @@ type (
// 10 seconds. For shorter requests, the idle heartbeat is disabled.
// This can be configured using FetchHeartbeat option. If a client does
// not receive a heartbeat message from a stream for more than 2 times
// the idle heartbeat setting, FetchBytes will return [ErrNoHeartbeat].
// the idle heartbeat setting, Fetch will return ErrNoHeartbeat.
//
// FetchBytes is non-blocking and returns MessageBatch, exposing a channel
// for delivered messages.
@@ -127,7 +127,7 @@ type (
// option, which provides information about errors encountered during
// consumption (both transient and terminal)
// - Consume can be configured to stop after a certain number of
// messages have been received using StopAfter option.
// messages is received using StopAfter option.
// - Consume can be optimized for throughput or memory usage using
// PullExpiry, PullMaxMessages, PullMaxBytes and PullHeartbeat options.
// Unless there is a specific use case, these options should not be used.
@@ -136,8 +136,8 @@ type (
// the consumer.
Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error)
// Messages returns MessagesContext, allowing continuous iteration
// over messages in a stream. Messages can be configured using
// Messages returns MessagesContext, allowing continuously iterating
// over messages on a stream. Messages can be configured using
// PullMessagesOpt options:
//
// - Messages can be optimized for throughput or memory usage using
@@ -149,8 +149,8 @@ type (
Messages(opts ...PullMessagesOpt) (MessagesContext, error)
// Next is used to retrieve the next message from the consumer. This
// method will block until the message is retrieved or the timeout
// is reached.
// method will block until the message is retrieved or timeout is
// reached.
Next(opts ...FetchOpt) (Msg, error)
// Info fetches current ConsumerInfo from the server.
@@ -496,12 +496,12 @@ func resumeConsumer(ctx context.Context, js *jetStream, stream, consumer string)
return pauseConsumer(ctx, js, stream, consumer, nil)
}
func validateConsumerName(name string) error {
if name == "" {
return fmt.Errorf("%w: name is required", ErrInvalidConsumerName)
func validateConsumerName(dur string) error {
if dur == "" {
return fmt.Errorf("%w: '%s'", ErrInvalidConsumerName, "name is required")
}
if strings.ContainsAny(name, ">*. /\\") {
return fmt.Errorf("%w: '%s'", ErrInvalidConsumerName, name)
if strings.ContainsAny(dur, ">*. /\\") {
return fmt.Errorf("%w: '%s'", ErrInvalidConsumerName, dur)
}
return nil
}

View File

@@ -216,14 +216,14 @@ type (
// settings from stream's ConsumerLimits. If neither are set, server
// default is 5 seconds.
//
// A consumer is considered inactive if no pull requests are received by
// the server (for pull consumers), or no interest is detected on the
// deliver subject (for push consumers), not if there are no
// A consumer is considered inactive there are not pull requests
// received by the server (for pull consumers), or no interest detected
// on deliver subject (for push consumers), not if there are no
// messages to be delivered.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
// Replicas is the number of replicas for the consumer's state. By
// default, consumers inherit the number of replicas from the stream.
// Replicas the number of replicas for the consumer's state. By default,
// consumers inherit the number of replicas from the stream.
Replicas int `json:"num_replicas"`
// MemoryStorage is a flag to force the consumer to use memory storage
@@ -243,19 +243,19 @@ type (
// PauseUntil is for suspending the consumer until the deadline.
PauseUntil *time.Time `json:"pause_until,omitempty"`
// PriorityPolicy represents the priority policy the consumer is set to.
// PriorityPolicy represents he priority policy the consumer is set to.
// Requires nats-server v2.11.0 or later.
PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"`
// PinnedTTL represents the time after which the client will be unpinned
// if no new pull requests are sent. Used with PriorityPolicyPinned.
// if no new pull requests are sent.Used with PriorityPolicyPinned.
// Requires nats-server v2.11.0 or later.
PinnedTTL time.Duration `json:"priority_timeout,omitempty"`
// PriorityGroups is a list of priority groups this consumer supports.
PriorityGroups []string `json:"priority_groups,omitempty"`
// Fields specific to push consumers:
// Fields specific for push consumers:
// DeliverSubject is the subject to deliver messages to for push consumers
DeliverSubject string `json:"deliver_subject,omitempty"`
@@ -264,9 +264,9 @@ type (
DeliverGroup string `json:"deliver_group,omitempty"`
// FlowControl is a flag to enable flow control for the consumer.
// When set, the server will regularly send an empty message with status
// header 100 and a reply subject. Consumers must reply to these
// messages to control the rate of message delivery.
// When set, server will regularly send an empty message with Status
// header 100 and a reply subject, consumers must reply to these
// messages to control the rate of message delivery
FlowControl bool `json:"flow_control,omitempty"`
// IdleHeartbeat enables push consumer idle heartbeat messages.
@@ -302,9 +302,9 @@ type (
// ReplayPolicy defines the rate at which messages are sent to the
// consumer. If ReplayOriginalPolicy is set, messages are sent in the
// same intervals in which they were stored on the stream. This can be
// used e.g. to simulate production traffic in development environments.
// If ReplayInstantPolicy is set, messages are sent as fast as possible.
// same intervals in which they were stored on stream. This can be used
// e.g. to simulate production traffic in development environments. If
// ReplayInstantPolicy is set, messages are sent as fast as possible.
// Defaults to ReplayInstantPolicy.
ReplayPolicy ReplayPolicy `json:"replay_policy"`
@@ -317,12 +317,12 @@ type (
// (and no payload). Defaults to false.
HeadersOnly bool `json:"headers_only,omitempty"`
// MaxResetAttempts is the maximum number of attempts to recreate the
// consumer in a single recovery cycle. Defaults to unlimited.
// Maximum number of attempts for the consumer to be recreated in a
// single recreation cycle. Defaults to unlimited.
MaxResetAttempts int
// Metadata is a set of application-defined key-value pairs for
// associating metadata with the consumer. This feature requires
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
@@ -386,7 +386,7 @@ func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
case jsonString("prioritized"):
*p = PriorityPolicyPrioritized
default:
return fmt.Errorf("nats: cannot unmarshal %q", data)
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
@@ -446,7 +446,7 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
case jsonString("last_per_subject"):
*p = DeliverLastPerSubjectPolicy
default:
return fmt.Errorf("nats: cannot unmarshal %q", data)
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
@@ -509,7 +509,7 @@ func (p *AckPolicy) UnmarshalJSON(data []byte) error {
case jsonString("explicit"):
*p = AckExplicitPolicy
default:
return fmt.Errorf("nats: cannot unmarshal %q", data)
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
@@ -554,7 +554,7 @@ func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
case jsonString("original"):
*p = ReplayOriginalPolicy
default:
return fmt.Errorf("nats: cannot unmarshal %q", data)
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}

View File

@@ -151,7 +151,7 @@ var (
// FilterSubjects are specified when creating consumer.
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}
// ErrOverlappingFilterSubjects is returned when filter subjects overlap when
// ErrDuplicateFilterSubjects is returned when filter subjects overlap when
// creating consumer.
ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}}
@@ -167,8 +167,8 @@ var (
// already created in the server.
ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"}
// ErrConsumerNameAlreadyInUse is an error returned when attempting to create
// a consumer with a name that is already in use.
// ErrConsumerNotFound is an error returned when consumer with given name
// does not exist.
ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"}
// ErrNotPullConsumer is returned when attempting to fetch or create pull
@@ -289,8 +289,8 @@ var (
// shutdown.
ErrServerShutdown JetStreamError = &jsError{message: "server shutdown"}
// ErrOrderedConsumerReset indicates that the ordered consumer was
// automatically reset and recreated to preserve message ordering.
// ErrOrderedConsumerReset is returned when resetting ordered consumer fails
// due to too many attempts.
ErrOrderedConsumerReset JetStreamError = &jsError{message: "recreating ordered consumer"}
// ErrOrderConsumerUsedAsFetch is returned when ordered consumer was already
@@ -353,7 +353,7 @@ var (
// deleted.
ErrKeyDeleted JetStreamError = &jsError{message: "key was deleted"}
// ErrHistoryTooLarge is returned when provided history limit is larger than
// ErrHistoryToLarge is returned when provided history limit is larger than
// 64.
ErrHistoryTooLarge JetStreamError = &jsError{message: "history limited to a max of 64"}

View File

@@ -34,9 +34,9 @@ type (
//
// - Publishing messages to a stream using [Publisher].
// - Managing streams using [StreamManager].
// - Managing consumers using [StreamConsumerManager]. These are the same
// methods available on [Stream], but exposed here as a shortcut that bypasses
// stream lookup.
// - Managing consumers using [StreamConsumerManager]. Those are the same
// methods as on [Stream], but are available as a shortcut to a consumer
// bypassing stream lookup.
// - Managing KeyValue stores using [KeyValueManager].
// - Managing Object Stores using [ObjectStoreManager].
//
@@ -95,8 +95,8 @@ type (
// be bound to a stream) and nats.Message.
//
// PublishMsgAsync does not guarantee that the message has been
// received by the server. It only guarantees that the message has been
// sent to the server and thus messages can be stored in the stream
// received by the server. It only guarantees that the message has been
// out of order in case of retries.
PublishMsgAsync(msg *nats.Msg, opts ...PublishOpt) (PubAckFuture, error)
@@ -110,7 +110,7 @@ type (
// server.
PublishAsyncComplete() <-chan struct{}
// CleanupPublisher will clean up the publishing side of JetStreamContext.
// CleanupPublisher will cleanup the publishing side of JetStreamContext.
//
// This will unsubscribe from the internal reply subject if needed.
// All pending async publishes will fail with ErrJetStreamContextClosed.
@@ -129,7 +129,7 @@ type (
// CreateOrUpdateStream and Stream methods return a [Stream] interface, allowing
// to operate on a stream.
StreamManager interface {
// CreateStream creates a new stream with the given config and returns an
// CreateStream creates a new stream with given config and returns an
// interface to operate on it. If stream with given name already exists
// and its configuration differs from the provided one,
// ErrStreamNameAlreadyInUse is returned.
@@ -139,7 +139,7 @@ type (
// ErrStreamNotFound is returned.
UpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error)
// CreateOrUpdateStream creates a stream with the given config. If stream
// CreateOrUpdateStream creates a stream with given config. If stream
// already exists, it will be updated (if possible).
CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error)
@@ -147,7 +147,7 @@ type (
// If stream does not exist, ErrStreamNotFound is returned.
Stream(ctx context.Context, stream string) (Stream, error)
// StreamNameBySubject returns the name of the stream that listens on the given
// StreamNameBySubject returns a stream name stream listening on given
// subject. If no stream is bound to given subject, ErrStreamNotFound
// is returned.
StreamNameBySubject(ctx context.Context, subject string) (string, error)
@@ -156,11 +156,11 @@ type (
// exist, ErrStreamNotFound is returned.
DeleteStream(ctx context.Context, stream string) error
// ListStreams returns StreamInfoLister, enabling iteration over a
// ListStreams returns StreamInfoLister, enabling iterating over a
// channel of stream infos.
ListStreams(context.Context, ...StreamListOpt) StreamInfoLister
// StreamNames returns a StreamNameLister, enabling iteration over a
// StreamNames returns a StreamNameLister, enabling iterating over a
// channel of stream names.
StreamNames(context.Context, ...StreamListOpt) StreamNameLister
}
@@ -169,11 +169,11 @@ type (
// available as a part of [JetStream] interface. This is an alternative to
// [Stream] interface, allowing to bypass stream lookup. CreateConsumer,
// UpdateConsumer, CreateOrUpdateConsumer and Consumer methods return a
// [Consumer] interface, allowing operation on a consumer (e.g. consume
// [Consumer] interface, allowing to operate on a consumer (e.g. consume
// messages).
StreamConsumerManager interface {
// CreateOrUpdateConsumer creates a consumer on a given stream with
// the given config. If consumer already exists, it will be updated (if
// given config. If consumer already exists, it will be updated (if
// possible). Consumer interface is returned, allowing to operate on a
// consumer (e.g. fetch messages).
CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
@@ -191,11 +191,10 @@ type (
// returned, allowing to operate on a consumer (e.g. fetch messages).
UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// OrderedConsumer returns a client-managed ordered consumer for the given stream.
// Ordered consumers use ephemeral pull consumers and automatically reset
// themselves when ordering is lost. The client tracks state in memory and
// recreates the underlying consumer as needed, making them resilient to deletes
// and restarts while ensuring message order is preserved.
// OrderedConsumer returns an OrderedConsumer instance. OrderedConsumer
// are managed by the library and provide a simple way to consume
// messages from a stream. Ordered consumers are ephemeral in-memory
// pull consumers and are resilient to deletes and restarts.
OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error)
// Consumer returns an interface to an existing consumer, allowing processing
@@ -214,7 +213,7 @@ type (
ResumeConsumer(ctx context.Context, stream string, consumer string) (*ConsumerPauseResponse, error)
// CreateOrUpdatePushConsumer creates a push consumer on a given stream with
// the given config. If consumer already exists, it will be updated (if
// given config. If consumer already exists, it will be updated (if
// possible). Consumer interface is returned, allowing to consume messages.
CreateOrUpdatePushConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (PushConsumer, error)
@@ -235,7 +234,7 @@ type (
// of messages. If consumer does not exist, ErrConsumerNotFound is
// returned.
//
// It returns ErrNotPushConsumer if the consumer is not a push consumer (delivery subject is not set).
// It returns ErrNotPushConsumer if the consumer is not a push consumer (deliver subject is not set).
PushConsumer(ctx context.Context, stream string, consumer string) (PushConsumer, error)
}
@@ -566,7 +565,7 @@ func (js *jetStream) Options() JetStreamOptions {
return opts
}
// CreateStream creates a new stream with the given config and returns an
// CreateStream creates a new stream with given config and returns an
// interface to operate on it. If stream with given name already exists,
// ErrStreamNameAlreadyInUse is returned.
func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream, error) {
@@ -674,7 +673,7 @@ func convertStreamConfigDomains(cfg StreamConfig) (StreamConfig, error) {
}
}
// Check sources for the same conversion.
// Check sources for the same.
if len(ncfg.Sources) > 0 {
ncfg.Sources = append([]*StreamSource(nil), ncfg.Sources...)
for i, ss := range ncfg.Sources {
@@ -755,7 +754,7 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
}, nil
}
// CreateOrUpdateStream creates a stream with the given config. If stream
// CreateOrUpdateStream creates a stream with given config. If stream
// already exists, it will be updated (if possible).
func (js *jetStream) CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error) {
s, err := js.UpdateStream(ctx, cfg)
@@ -824,7 +823,7 @@ func (js *jetStream) DeleteStream(ctx context.Context, name string) error {
}
// CreateOrUpdateConsumer creates a consumer on a given stream with
// the given config. If consumer already exists, it will be updated (if
// given config. If consumer already exists, it will be updated (if
// possible). Consumer interface is returned, allowing to operate on a
// consumer (e.g. fetch messages).
func (js *jetStream) CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
@@ -906,7 +905,7 @@ func (js *jetStream) DeleteConsumer(ctx context.Context, stream string, name str
}
// CreateOrUpdatePushConsumer creates a push consumer on a given stream with
// the given config. If consumer already exists, it will be updated (if
// given config. If consumer already exists, it will be updated (if
// possible). Consumer interface is returned, allowing to consume messages.
func (js *jetStream) CreateOrUpdatePushConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (PushConsumer, error) {
if err := validateStreamName(stream); err != nil {
@@ -1015,7 +1014,7 @@ func (js *jetStream) AccountInfo(ctx context.Context) (*AccountInfo, error) {
return &resp.AccountInfo, nil
}
// ListStreams returns StreamInfoLister, enabling iteration over a
// ListStreams returns StreamInfoLister, enabling iterating over a
// channel of stream infos.
func (js *jetStream) ListStreams(ctx context.Context, opts ...StreamListOpt) StreamInfoLister {
l := &streamLister{
@@ -1069,7 +1068,7 @@ func (s *streamLister) Err() error {
return s.err
}
// StreamNames returns a StreamNameLister, enabling iteration over a
// StreamNames returns a StreamNameLister, enabling iterating over a
// channel of stream names.
func (js *jetStream) StreamNames(ctx context.Context, opts ...StreamListOpt) StreamNameLister {
l := &streamLister{
@@ -1113,7 +1112,7 @@ func (js *jetStream) StreamNames(ctx context.Context, opts ...StreamListOpt) Str
return l
}
// StreamNameBySubject returns the name of the stream bound to the given
// StreamNameBySubject returns a stream name stream listening on given
// subject. If no stream is bound to given subject, ErrStreamNotFound
// is returned.
func (js *jetStream) StreamNameBySubject(ctx context.Context, subject string) (string, error) {
@@ -1265,7 +1264,7 @@ func (js *jetStream) cleanupReplySub() {
js.publisher.replySub = nil
}
if js.publisher.connStatusCh != nil {
js.conn.RemoveStatusListener(js.publisher.connStatusCh)
close(js.publisher.connStatusCh)
js.publisher.connStatusCh = nil
}
js.publisher.Unlock()

View File

@@ -250,7 +250,7 @@ type PullMaxBytes int
func (max PullMaxBytes) configureConsume(opts *consumeOpts) error {
if max <= 0 {
return fmt.Errorf("%w: max bytes must be greater than 0", ErrInvalidOption)
return fmt.Errorf("%w: max bytes must be greater then 0", ErrInvalidOption)
}
opts.MaxBytes = int(max)
return nil
@@ -258,7 +258,7 @@ func (max PullMaxBytes) configureConsume(opts *consumeOpts) error {
func (max PullMaxBytes) configureMessages(opts *consumeOpts) error {
if max <= 0 {
return fmt.Errorf("%w: max bytes must be greater than 0", ErrInvalidOption)
return fmt.Errorf("%w: max bytes must be greater then 0", ErrInvalidOption)
}
opts.MaxBytes = int(max)
return nil

View File

@@ -178,7 +178,7 @@ type (
// the key value store in a streaming fashion (on a channel).
ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error)
// ListKeysFiltered returns a KeyLister for filtered keys in the bucket.
// ListKeysFiltered ListKeysWithFilters returns a KeyLister for filtered keys in the bucket.
ListKeysFiltered(ctx context.Context, filters ...string) (KeyLister, error)
// History will return all historical values for the key (up to
@@ -241,7 +241,7 @@ type (
// subject after it's stored.
RePublish *RePublish `json:"republish,omitempty"`
// Mirror defines the configuration for mirroring another KeyValue
// Mirror defines the consiguration for mirroring another KeyValue
// store.
Mirror *StreamSource `json:"mirror,omitempty"`
@@ -328,9 +328,6 @@ type (
// Metadata returns the metadata associated with the bucket.
Metadata() map[string]string
// Config returns the configuration for the bucket.
Config() KeyValueConfig
}
// KeyWatcher is what is returned when doing a watch. It can be used to
@@ -834,27 +831,6 @@ func (s *KeyValueBucketStatus) LimitMarkerTTL() time.Duration {
return s.info.Config.SubjectDeleteMarkerTTL
}
// Config returns the configuration for the bucket.
func (s *KeyValueBucketStatus) Config() KeyValueConfig {
return KeyValueConfig{
Bucket: s.bucket,
Description: s.info.Config.Description,
MaxValueSize: s.info.Config.MaxMsgSize,
History: uint8(s.info.Config.MaxMsgsPerSubject),
TTL: s.info.Config.MaxAge,
MaxBytes: s.info.Config.MaxBytes,
Storage: s.info.Config.Storage,
Replicas: s.info.Config.Replicas,
Placement: s.info.Config.Placement,
RePublish: s.info.Config.RePublish,
Mirror: s.info.Config.Mirror,
Sources: s.info.Config.Sources,
Compression: s.info.Config.Compression != NoCompression,
Metadata: s.info.Config.Metadata,
LimitMarkerTTL: s.info.Config.SubjectDeleteMarkerTTL,
}
}
// Metadata returns the metadata associated with the bucket.
func (s *KeyValueBucketStatus) Metadata() map[string]string {
return s.info.Config.Metadata
@@ -1204,8 +1180,6 @@ func (w *watcher) Stop() error {
return w.sub.Unsubscribe()
}
// WatchFiltered will watch for any updates to keys that match the provided
// key filters. It can be configured with the same options as Watch.
func (kv *kvs) WatchFiltered(ctx context.Context, keys []string, opts ...WatchOpt) (KeyWatcher, error) {
for _, key := range keys {
if !searchKeyValid(key) {
@@ -1390,7 +1364,7 @@ type keyLister struct {
keys chan string
}
// ListKeys will return all keys.
// Keys will return all keys.
func (kv *kvs) ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error) {
opts = append(opts, IgnoreDeletes(), MetaOnly())
watcher, err := kv.WatchAll(ctx, opts...)
@@ -1417,7 +1391,7 @@ func (kv *kvs) ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error
return kl, nil
}
// ListKeysFiltered returns a KeyLister for filtered keys in the bucket.
// ListKeysWithFilters returns a channel of keys matching the provided filters using WatchFiltered.
func (kv *kvs) ListKeysFiltered(ctx context.Context, filters ...string) (KeyLister, error) {
watcher, err := kv.WatchFiltered(ctx, filters, IgnoreDeletes(), MetaOnly())
if err != nil {

View File

@@ -29,7 +29,7 @@ func (opt watchOptFn) configureWatcher(opts *watchOpts) error {
func IncludeHistory() WatchOpt {
return watchOptFn(func(opts *watchOpts) error {
if opts.updatesOnly {
return fmt.Errorf("%w: include history cannot be used with updates only", ErrInvalidOption)
return fmt.Errorf("%w: include history can not be used with updates only", ErrInvalidOption)
}
opts.includeHistory = true
return nil
@@ -41,14 +41,14 @@ func IncludeHistory() WatchOpt {
func UpdatesOnly() WatchOpt {
return watchOptFn(func(opts *watchOpts) error {
if opts.includeHistory {
return fmt.Errorf("%w: updates only cannot be used with include history", ErrInvalidOption)
return fmt.Errorf("%w: updates only can not be used with include history", ErrInvalidOption)
}
opts.updatesOnly = true
return nil
})
}
// IgnoreDeletes will prevent the key watcher from passing any deleted keys.
// IgnoreDeletes will have the key watcher not pass any deleted keys.
func IgnoreDeletes() WatchOpt {
return watchOptFn(func(opts *watchOpts) error {
opts.ignoreDeletes = true
@@ -56,7 +56,7 @@ func IgnoreDeletes() WatchOpt {
})
}
// MetaOnly instructs the key watcher to retrieve only the entry metadata, not
// MetaOnly instructs the key watcher to retrieve only the entry meta data, not
// the entry value.
func MetaOnly() WatchOpt {
return watchOptFn(func(opts *watchOpts) error {
@@ -121,7 +121,7 @@ func (opt createOptFn) configureCreate(opts *createOpts) error {
}
// KeyTTL sets the TTL for the key. This is the time after which the key will be
// automatically deleted. The TTL is set when the key is created and cannot be
// automatically deleted. The TTL is set when the key is created and can not be
// changed later. This requires LimitMarkerTTL to be enabled on the bucket.
func KeyTTL(ttl time.Duration) KVCreateOpt {
return createOptFn(func(opts *createOpts) error {

View File

@@ -180,7 +180,7 @@ const (
// level. Server will reject the message if it is not the case.
//
// This can be set when publishing messages using [WithExpectLastSequence]
// option.
// option. option.
ExpectedLastSeqHeader = "Nats-Expected-Last-Sequence"
// ExpectedLastSubjSeqHeader contains the expected last sequence number on
@@ -227,7 +227,7 @@ const (
// SequenceHeader contains the original sequence number of the message.
SequenceHeader = "Nats-Sequence"
// TimeStampHeaer contains the original timestamp of the message.
// TimeStampHeader contains the original timestamp of the message.
TimeStampHeaer = "Nats-Time-Stamp"
// SubjectHeader contains the original subject the message was published to.
@@ -291,7 +291,7 @@ func (m *jetStreamMsg) Headers() nats.Header {
return m.msg.Header
}
// Subject returns a subject on which a message was published/received.
// Subject returns a subject on which a message is published.
func (m *jetStreamMsg) Subject() string {
return m.msg.Subject
}
@@ -408,9 +408,9 @@ func (m *jetStreamMsg) checkReply() error {
return nil
}
// checkMsg returns whether the given message is a user message or a control message.
// If the status header is present, it returns an appropriate error based
// on the status code (404, etc.)
// Returns if the given message is a user message or not, and if
// checkSts() is true, returns appropriate error based on the
// content of the status (404, etc..)
func checkMsg(msg *nats.Msg) (bool, error) {
// If payload or no header, consider this a user message
if len(msg.Data) > 0 || len(msg.Header) == 0 {

View File

@@ -675,17 +675,8 @@ func (obs *obs) Put(ctx context.Context, meta ObjectMeta, r io.Reader) (*ObjectI
return perr
}
opts := []JetStreamOpt{
WithPublishAsyncErrHandler(func(js JetStream, _ *nats.Msg, err error) { setErr(err) }),
}
// if context deadline is not set, use default JetStream timeout (per publish)
if _, ok := ctx.Deadline(); !ok {
opts = append(opts, WithPublishAsyncTimeout(obs.js.opts.DefaultTimeout))
}
// Create our own JS context to handle errors etc.
pubJS, err := New(obs.js.conn, opts...)
pubJS, err := New(obs.js.conn, WithPublishAsyncErrHandler(func(js JetStream, _ *nats.Msg, err error) { setErr(err) }))
if err != nil {
return nil, err
}

View File

@@ -48,7 +48,7 @@ type (
GetMsg(ctx context.Context, seq uint64, opts ...GetMsgOpt) (*RawStreamMsg, error)
// GetLastMsgForSubject retrieves the last raw stream message stored in
// JetStream on a given subject.
// JetStream on a given subject subject.
GetLastMsgForSubject(ctx context.Context, subject string) (*RawStreamMsg, error)
// DeleteMsg deletes a message from a stream.

View File

@@ -296,7 +296,8 @@ type jsOpts struct {
}
const (
defaultRequestWait = 5 * time.Second
defaultRequestWait = 5 * time.Second
defaultAccountCheck = 20 * time.Second
)
// JetStream returns a JetStreamContext for messaging and stream management.
@@ -2834,15 +2835,12 @@ func ConsumerFilterSubjects(subjects ...string) SubOpt {
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
if sub.jsi == nil {
sub.mu.Unlock()
return nil, ErrTypeSubscription
} else if sub.jsi.consumer == _EMPTY_ {
ordered := sub.jsi.ordered
sub.mu.Unlock()
if ordered {
if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
if sub.jsi.ordered {
sub.mu.Unlock()
return nil, ErrConsumerInfoOnOrderedReset
}
sub.mu.Unlock()
return nil, ErrTypeSubscription
}

View File

@@ -105,9 +105,6 @@ type KeyValueStatus interface {
// IsCompressed indicates if the data is compressed on disk
IsCompressed() bool
// Config returns the original configuration used to create the bucket
Config() KeyValueConfig
}
// KeyWatcher is what is returned when doing a watch.
@@ -1211,24 +1208,6 @@ func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes }
// IsCompressed indicates if the data is compressed on disk
func (s *KeyValueBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression }
func (s *KeyValueBucketStatus) Config() KeyValueConfig {
return KeyValueConfig{
Bucket: s.bucket,
Description: s.nfo.Config.Description,
MaxValueSize: s.nfo.Config.MaxMsgSize,
History: uint8(s.nfo.Config.MaxMsgsPerSubject),
TTL: s.nfo.Config.MaxAge,
MaxBytes: s.nfo.Config.MaxBytes,
Storage: s.nfo.Config.Storage,
Replicas: s.nfo.Config.Replicas,
Placement: s.nfo.Config.Placement,
RePublish: s.nfo.Config.RePublish,
Mirror: s.nfo.Config.Mirror,
Sources: s.nfo.Config.Sources,
Compression: s.nfo.Config.Compression != NoCompression,
}
}
// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
nfo, err := kv.js.StreamInfo(kv.stream)

View File

@@ -48,7 +48,7 @@ import (
// Default Constants
const (
Version = "1.49.0"
Version = "1.48.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@@ -152,8 +152,6 @@ var (
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded")
ErrWebSocketHeadersAlreadySet = errors.New("nats: websocket connection headers already set")
ErrServerNotInPool = errors.New("nats: selected server is not in the pool")
ErrMixingWebsocketSchemes = errors.New("nats: mixing of websocket and non websocket URLs is not allowed")
)
// GetDefaultOptions returns default configuration options for the client.
@@ -244,7 +242,7 @@ type SignatureHandler func([]byte) ([]byte, error)
// AuthTokenHandler is used to generate a new token.
type AuthTokenHandler func() string
// UserInfoCB is used to pass the username and password when establishing a connection.
// UserInfoCB is used to pass the username and password when establishing connection.
type UserInfoCB func() (string, string)
// ReconnectDelayHandler is used to get from the user the desired
@@ -256,25 +254,6 @@ type ReconnectDelayHandler func(attempts int) time.Duration
// WebSocketHeadersHandler is an optional callback handler for generating token used for WebSocket connections.
type WebSocketHeadersHandler func() (http.Header, error)
// ReconnectToServerHandler is used to determine the server to reconnect to during
// the reconnection process. The handler receives a snapshot of available servers
// in the pool and should return a pointer to one of the servers from the provided
// slice and a delay before attempting the connection.
//
// Return values:
// - *Server: The server to connect to. Must be a pointer to an element from the
// provided servers slice. If the returned server is not in the pool, the library
// will fire ReconnectErrCB with ErrServerNotInPool and fall back to default
// server selection.
// - time.Duration: The delay before attempting the connection. If zero, the
// connection attempt is made immediately. If non-zero, the library sleeps
// for exactly that duration before attempting.
//
// MaxReconnect limits are enforced automatically: servers exceeding the configured
// MaxReconnect attempts are removed from the pool before the handler is called.
// To disable this limit, set MaxReconnect to a negative value.
type ReconnectToServerHandler func([]Server, ServerInfo) (*Server, time.Duration)
// asyncCB is used to preserve order for async callbacks.
type asyncCB struct {
f func()
@@ -371,7 +350,7 @@ type Options struct {
// Defaults to 60.
MaxReconnect int
// ReconnectWait sets the time to back off after attempting a reconnect
// ReconnectWait sets the time to backoff after attempting a reconnect
// to a server that we were already connected to previously.
// Defaults to 2s.
ReconnectWait time.Duration
@@ -398,7 +377,7 @@ type Options struct {
// Defaults to 2s.
Timeout time.Duration
// DrainTimeout sets the timeout for a drain operation to complete.
// DrainTimeout sets the timeout for a Drain Operation to complete.
// Defaults to 30s.
DrainTimeout time.Duration
@@ -452,25 +431,18 @@ type Options struct {
AsyncErrorCB ErrHandler
// ReconnectErrCB sets the callback that is invoked whenever a
// reconnect attempt fails.
// reconnect attempt failed
ReconnectErrCB ConnErrHandler
// ReconnectToServerCB is called before reconnection attempt.
// It is used to determine the server to reconnect to out of
// the list of available servers.
// If a reconnect attempt is not successful, this callback will
// be called again before the next attempt.
ReconnectToServerCB ReconnectToServerHandler
// ReconnectBufSize is the size of the backing bufio during reconnect.
// Once this has been exhausted publish operations will return an error.
// Defaults to 8388608 bytes (8MB).
ReconnectBufSize int
// SubChanLen is the size of the buffered channel used between the socket
// goroutine and the message delivery for SyncSubscriptions.
// Go routine and the message delivery for SyncSubscriptions.
// NOTE: This does not affect AsyncSubscriptions which are
// dictated by PendingLimits().
// dictated by PendingLimits()
// Defaults to 65536.
SubChanLen int
@@ -498,8 +470,7 @@ type Options struct {
// Token sets the token to be used when connecting to a server.
Token string
// TokenHandler designates the function used to generate the token
// used when connecting to a server.
// TokenHandler designates the function used to generate the token to be used when connecting to a server.
TokenHandler AuthTokenHandler
// Dialer allows a custom net.Dialer when forming connections.
@@ -561,17 +532,13 @@ type Options struct {
// WebSocketConnectionHeaders is an optional http request headers to be sent with the WebSocket request.
WebSocketConnectionHeaders http.Header
// WebSocketConnectionHeadersHandler is an optional callback handler for generating token used for WebSocket connections.
// WebSocketConnectionHeadersHandler is an optional callback handler for generating token used for WebSocket connections.
WebSocketConnectionHeadersHandler WebSocketHeadersHandler
// SkipSubjectValidation will disable publish subject validation.
// NOTE: This is not recommended in general, as the performance gain is minimal
// and may lead to breaking protocol.
SkipSubjectValidation bool
// IgnoreDiscoveredServers will disable adding advertised server URLs
// from INFO messages to the server pool.
IgnoreDiscoveredServers bool
}
const (
@@ -610,14 +577,14 @@ type Conn struct {
// Modifying the configuration of a running Conn is a race.
Opts Options
wg sync.WaitGroup
srvPool []*Server
current *Server
srvPool []*srv
current *srv
urls map[string]struct{} // Keep track of all known URLs (used by processInfo)
conn net.Conn
bw *natsWriter
br *natsReader
fch chan struct{}
info ServerInfo
info serverInfo
ssid int64
subsMu sync.RWMutex
subs map[int64]*Subscription
@@ -848,27 +815,18 @@ type Statistics struct {
Reconnects uint64
}
// Server represents a server in the pool of servers that the client can connect to.
type Server struct {
URL *url.URL
Reconnects int
// Tracks individual backend servers.
type srv struct {
url *url.URL
didConnect bool
reconnects int
lastErr error
isImplicit bool
tlsName string
}
func (s Server) clone() Server {
c := s
if s.URL != nil {
u := *s.URL
c.URL = &u
}
return c
}
// ServerInfo represents the information about the server that is sent in the INFO protocol message.
type ServerInfo struct {
// The INFO block received from the server.
type serverInfo struct {
ID string `json:"server_id"`
Name string `json:"server_name"`
Proto int `json:"proto"`
@@ -949,7 +907,7 @@ func Name(name string) Option {
}
}
// InProcessServer is an Option that will try to establish a connection to a NATS server
// InProcessServer is an Option that will try to establish a direction to a NATS server
// running within the process instead of dialing via TCP.
func InProcessServer(server InProcessConnProvider) Option {
return func(o *Options) error {
@@ -1129,19 +1087,6 @@ func CustomReconnectDelay(cb ReconnectDelayHandler) Option {
}
}
// ReconnectToServer is an Option to set a custom server selection callback
// for reconnection attempts. The callback receives a snapshot of available
// servers and must return a server from the pool to connect to and a delay
// duration before attempting the connection.
//
// See ReconnectToServerHandler for detailed documentation and usage examples.
func ReconnectToServer(cb ReconnectToServerHandler) Option {
return func(o *Options) error {
o.ReconnectToServerCB = cb
return nil
}
}
// PingInterval is an Option to set the period for client ping commands.
// Defaults to 2m.
func PingInterval(t time.Duration) Option {
@@ -1577,7 +1522,7 @@ func WebSocketConnectionHeadersHandler(cb WebSocketHeadersHandler) Option {
// By default, subject validation is performed to ensure that subjects
// are valid according to NATS subject syntax (no spaces newlines and tabs).
// NOTE: It is not recommended to use this option as the performance gain
// is minimal and disabling subject validation can lead to breaking protocol
// is minimal and disabling subject validation can lead breaking protocol
// rules.
func SkipSubjectValidation() Option {
return func(o *Options) error {
@@ -1586,15 +1531,6 @@ func SkipSubjectValidation() Option {
}
}
// IgnoreDiscoveredServers is an Option to disable adding advertised
// server URLs from INFO messages to the server pool.
func IgnoreDiscoveredServers() Option {
return func(o *Options) error {
o.IgnoreDiscoveredServers = true
return nil
}
}
// Handler processing
// SetDisconnectHandler will set the disconnect event handler.
@@ -1854,7 +1790,7 @@ const (
)
// Return the currently selected server
func (nc *Conn) currentServer() (int, *Server) {
func (nc *Conn) currentServer() (int, *srv) {
for i, s := range nc.srvPool {
if s == nil {
continue
@@ -1868,7 +1804,7 @@ func (nc *Conn) currentServer() (int, *Server) {
// Pop the current server and put onto the end of the list. Select head of list as long
// as number of reconnect attempts under MaxReconnect.
func (nc *Conn) selectNextServer() (*Server, error) {
func (nc *Conn) selectNextServer() (*srv, error) {
i, s := nc.currentServer()
if i < 0 {
return nil, ErrNoServers
@@ -1877,7 +1813,7 @@ func (nc *Conn) selectNextServer() (*Server, error) {
num := len(sp)
copy(sp[i:num-1], sp[i+1:num])
maxReconnect := nc.Opts.MaxReconnect
if maxReconnect < 0 || s.Reconnects < maxReconnect {
if maxReconnect < 0 || s.reconnects < maxReconnect {
nc.srvPool[num-1] = s
} else {
nc.srvPool = sp[0 : num-1]
@@ -1913,7 +1849,7 @@ const tlsScheme = "tls"
// Server Options. We will randomize the server pool unless
// the NoRandomize flag is set.
func (nc *Conn) setupServerPool() error {
nc.srvPool = make([]*Server, 0, srvPoolSize)
nc.srvPool = make([]*srv, 0, srvPoolSize)
nc.urls = make(map[string]struct{}, srvPoolSize)
// Create srv objects from each url string in nc.Opts.Servers
@@ -1950,7 +1886,7 @@ func (nc *Conn) setupServerPool() error {
// Check for Scheme hint to move to TLS mode.
for _, srv := range nc.srvPool {
if srv.URL.Scheme == tlsScheme || srv.URL.Scheme == wsSchemeTLS {
if srv.url.Scheme == tlsScheme || srv.url.Scheme == wsSchemeTLS {
// FIXME(dlc), this is for all in the pool, should be case by case.
nc.Opts.Secure = true
if nc.Opts.TLSConfig == nil {
@@ -1981,10 +1917,8 @@ func hostIsIP(u *url.URL) bool {
return net.ParseIP(u.Hostname()) != nil
}
// parseServerURL parses a server URL string into a Server struct.
// It handles scheme defaults and port defaults. Does not validate websocket consistency.
// Returns the parsed Server and whether it's a websocket URL.
func (nc *Conn) parseServerURL(sURL string, implicit, saveTLSName bool) (*Server, error) {
// addURLToPool adds an entry to the server pool
func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
if !strings.Contains(sURL, "://") {
sURL = fmt.Sprintf("%s://%s", nc.connScheme(), sURL)
}
@@ -1995,7 +1929,7 @@ func (nc *Conn) parseServerURL(sURL string, implicit, saveTLSName bool) (*Server
for i := 0; i < 2; i++ {
u, err = url.Parse(sURL)
if err != nil {
return nil, err
return err
}
if u.Port() != "" {
break
@@ -2015,9 +1949,19 @@ func (nc *Conn) parseServerURL(sURL string, implicit, saveTLSName bool) (*Server
}
}
isWS := isWebsocketScheme(u)
// We don't support mix and match of websocket and non websocket URLs.
// If this is the first URL, then we accept and switch the global state
// to websocket. After that, we will know how to reject mixed URLs.
if len(nc.srvPool) == 0 {
nc.ws = isWS
} else if isWS && !nc.ws || !isWS && nc.ws {
return errors.New("mixing of websocket and non websocket URLs is not allowed")
}
var tlsName string
if implicit {
curl := nc.current.URL
curl := nc.current.url
// Check to see if we do not have a url.User but current connected
// url does. If so copy over.
if u.User == nil && curl.User != nil {
@@ -2031,29 +1975,9 @@ func (nc *Conn) parseServerURL(sURL string, implicit, saveTLSName bool) (*Server
}
}
s := &Server{URL: u, isImplicit: implicit, tlsName: tlsName}
return s, nil
}
// addURLToPool adds an entry to the server pool
func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
s, err := nc.parseServerURL(sURL, implicit, saveTLSName)
if err != nil {
return err
}
// We don't support mix and match of websocket and non websocket URLs.
// If this is the first URL, then we accept and switch the global state
// to websocket. After that, we will know how to reject mixed URLs.
isWS := isWebsocketScheme(s.URL)
if len(nc.srvPool) == 0 {
nc.ws = isWS
} else if isWS != nc.ws {
return ErrMixingWebsocketSchemes
}
s := &srv{url: u, isImplicit: implicit, tlsName: tlsName}
nc.srvPool = append(nc.srvPool, s)
nc.urls[s.URL.Host] = struct{}{}
nc.urls[u.Host] = struct{}{}
return nil
}
@@ -2249,7 +2173,7 @@ func (nc *Conn) createConn() (err error) {
// We will auto-expand host names if they resolve to multiple IPs
hosts := []string{}
u := nc.current.URL
u := nc.current.url
if !nc.Opts.SkipHostLookup && net.ParseIP(u.Hostname()) == nil {
addrs, _ := net.LookupHost(u.Hostname())
@@ -2335,7 +2259,7 @@ func (nc *Conn) makeTLSConn() error {
if nc.current.tlsName != _EMPTY_ {
tlsCopy.ServerName = nc.current.tlsName
} else {
h, _, _ := net.SplitHostPort(nc.current.URL.Host)
h, _, _ := net.SplitHostPort(nc.current.url.Host)
tlsCopy.ServerName = h
}
}
@@ -2433,7 +2357,7 @@ func (nc *Conn) ConnectedUrl() string {
if nc.status != CONNECTED {
return _EMPTY_
}
return nc.current.URL.String()
return nc.current.url.String()
}
// ConnectedUrlRedacted reports the connected server's URL with passwords redacted
@@ -2448,7 +2372,7 @@ func (nc *Conn) ConnectedUrlRedacted() string {
if nc.status != CONNECTED {
return _EMPTY_
}
return nc.current.URL.Redacted()
return nc.current.url.Redacted()
}
// ConnectedAddr returns the connected server's IP
@@ -2663,7 +2587,7 @@ func (nc *Conn) connect() (bool, error) {
if err == nil {
nc.current.didConnect = true
nc.current.Reconnects = 0
nc.current.reconnects = 0
nc.current.lastErr = nil
break
} else {
@@ -2780,7 +2704,7 @@ func (nc *Conn) sendProto(proto string) {
func (nc *Conn) connectProto() (string, error) {
o := nc.Opts
var nkey, sig, user, pass, token, ujwt string
u := nc.current.URL.User
u := nc.current.url.User
if u != nil {
// if no password, assume username is authToken
if _, ok := u.Password(); !ok {
@@ -3075,88 +2999,17 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
}
for i := 0; len(nc.srvPool) > 0; {
var err error
var cur *Server
var callbackDelay time.Duration
var useCallbackDelay bool
if nc.Opts.ReconnectToServerCB != nil {
// Enforce MaxReconnect limits before calling the callback
maxReconnect := nc.Opts.MaxReconnect
if maxReconnect >= 0 {
// Remove servers that have exceeded MaxReconnect attempts
filtered := make([]*Server, 0, len(nc.srvPool))
for _, srv := range nc.srvPool {
if srv != nil && srv.Reconnects < maxReconnect {
filtered = append(filtered, srv)
}
}
nc.srvPool = filtered
}
// Check if we still have servers after filtering
if len(nc.srvPool) == 0 {
nc.err = ErrNoServers
break
}
// Copy server values to avoid caller modifying internal state
srvVals := make([]Server, len(nc.srvPool))
for idx, srv := range nc.srvPool {
if srv != nil {
srvVals[idx] = srv.clone()
}
}
var selectedSrv *Server
selectedSrv, callbackDelay = nc.Opts.ReconnectToServerCB(srvVals, nc.info)
if selectedSrv != nil {
idx := slices.IndexFunc(nc.srvPool, func(srv *Server) bool {
return srv != nil && srv.URL.String() == selectedSrv.URL.String()
})
if idx != -1 {
cur = nc.srvPool[idx]
nc.current = cur
useCallbackDelay = true
} else if reconnectErrCB := nc.Opts.ReconnectErrCB; reconnectErrCB != nil {
nc.ach.push(func() { reconnectErrCB(nc, ErrServerNotInPool) })
}
}
}
var doSleep bool
if cur == nil {
cur, err = nc.selectNextServer()
if err != nil {
nc.err = err
break
}
doSleep = i+1 >= len(nc.srvPool) && !forceReconnect
cur, err := nc.selectNextServer()
if err != nil {
nc.err = err
break
}
doSleep := i+1 >= len(nc.srvPool) && !forceReconnect
forceReconnect = false
nc.mu.Unlock()
if useCallbackDelay {
if callbackDelay > 0 {
i = 0
if rt == nil {
rt = time.NewTimer(callbackDelay)
} else {
rt.Reset(callbackDelay)
}
select {
case <-rqch:
rt.Stop()
nc.mu.Lock()
nc.rqch = make(chan struct{})
nc.mu.Unlock()
case <-rt.C:
}
} else {
runtime.Gosched()
}
} else if !doSleep {
if !doSleep {
i++
// Release the lock to give a chance to a concurrent nc.Close() to break the loop.
runtime.Gosched()
@@ -3202,7 +3055,7 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
}
// Mark that we tried a reconnect
cur.Reconnects++
cur.reconnects++
// Try to create a new connection
err = nc.createConn()
@@ -3237,7 +3090,7 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
// Clear out server stats for the server we connected to..
cur.didConnect = true
cur.Reconnects = 0
cur.reconnects = 0
// Send existing subscription state
nc.resendSubscriptions()
@@ -3905,7 +3758,7 @@ func (nc *Conn) processInfo(info string) error {
if info == _EMPTY_ {
return nil
}
var ncInfo ServerInfo
var ncInfo serverInfo
if err := json.Unmarshal([]byte(info), &ncInfo); err != nil {
return err
}
@@ -3916,7 +3769,7 @@ func (nc *Conn) processInfo(info string) error {
// if advertise is disabled on that server, or servers that
// did not include themselves in the async INFO protocol.
// If empty, do not remove the implicit servers from the pool.
if len(nc.info.ConnectURLs) == 0 || nc.Opts.IgnoreDiscoveredServers {
if len(nc.info.ConnectURLs) == 0 {
if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
}
@@ -3939,7 +3792,7 @@ func (nc *Conn) processInfo(info string) error {
sp := nc.srvPool
for i := 0; i < len(sp); i++ {
srv := sp[i]
curl := srv.URL.Host
curl := srv.url.Host
// Check if this URL is in the INFO protocol
_, inInfo := tmp[curl]
// Remove from the temp map so that at the end we are left with only
@@ -3947,7 +3800,7 @@ func (nc *Conn) processInfo(info string) error {
delete(tmp, curl)
// Keep servers that were set through Options, but also the one that
// we are currently connected to (even if it is a discovered server).
if !srv.isImplicit || srv.URL == nc.current.URL {
if !srv.isImplicit || srv.url == nc.current.url {
continue
}
if !inInfo {
@@ -3959,7 +3812,7 @@ func (nc *Conn) processInfo(info string) error {
}
}
// Figure out if we should save off the current non-IP hostname if we encounter a bare IP.
saveTLS := nc.current != nil && !hostIsIP(nc.current.URL)
saveTLS := nc.current != nil && !hostIsIP(nc.current.url)
// If there are any left in the tmp map, these are new (or restarted) servers
// and need to be added to the pool.
@@ -4912,7 +4765,7 @@ func (s *Subscription) IsValid() bool {
//
// For a JetStream subscription, if the library has created the JetStream
// consumer, the library will send a DeleteConsumer request to the server
// when the drain operation completes. If a failure occurs when deleting
// when the Drain operation completes. If a failure occurs when deleting
// the JetStream consumer, an error will be reported to the asynchronous
// error callback.
// If you do not wish the JetStream consumer to be automatically deleted,
@@ -6062,7 +5915,7 @@ func (nc *Conn) getServers(implicitOnly bool) []string {
if implicitOnly && !nc.srvPool[i].isImplicit {
continue
}
url := nc.srvPool[i].URL
url := nc.srvPool[i].url
servers = append(servers, fmt.Sprintf("%s://%s", url.Scheme, url.Host))
}
return servers
@@ -6218,103 +6071,6 @@ func (nc *Conn) Barrier(f func()) error {
return nil
}
// ServerPool returns a copy of the current server pool for the connection.
//
// This function should not be called from within connection callbacks to avoid
// potential deadlocks.
func (nc *Conn) ServerPool() []Server {
nc.mu.RLock()
defer nc.mu.RUnlock()
servers := make([]Server, len(nc.srvPool))
for i, srv := range nc.srvPool {
if srv != nil {
// Return a copy to avoid exposing internal state
servers[i] = srv.clone()
}
}
return servers
}
// SetServerPool allows updating the server pool for the connection. This
// replaces the existing pool with the provided list of server URLs. If the
// current server is not in the new pool, the client will switch to a server in
// the new pool on the next reconnect attempt. This function is thread-safe and
// can be called while the connection is active. It will return an error if the
// connection is closed or if any of the provided URLs are invalid.
//
// This function does not trigger an immediate reconnect. The new server
// pool will be used on the next reconnect attempt.
// If you want to trigger an immediate reconnect to apply the new server pool,
// you can call [Conn.ForceReconnect] after this function.
//
// Unless [IgnoreDiscoveredServers] is used true, the client will continue to
// discover and add new servers to the pool as it receives INFO messages from
// the server. If you want to prevent this behavior and only use the servers
// provided in SetServerPool, use [IgnoreDiscoveredServers].
//
// This function should not be called from within connection callbacks to avoid
// potential deadlocks.
func (nc *Conn) SetServerPool(servers []string) error {
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.isClosed() {
return ErrConnectionClosed
}
// Parse and validate all URLs first (without modifying state)
newPool := make([]*Server, 0, len(servers))
newURLs := make(map[string]struct{})
for _, addr := range servers {
s, err := nc.parseServerURL(addr, false, false)
if err != nil {
return err
}
if isWebsocketScheme(s.URL) != nc.ws {
return ErrMixingWebsocketSchemes
}
newPool = append(newPool, s)
newURLs[s.URL.Host] = struct{}{}
}
// Preserve state from existing pool entries
for _, newSrv := range newPool {
if idx := slices.IndexFunc(nc.srvPool, func(oldSrv *Server) bool {
return oldSrv != nil && oldSrv.URL.String() == newSrv.URL.String()
}); idx != -1 {
newSrv.Reconnects = nc.srvPool[idx].Reconnects
newSrv.didConnect = nc.srvPool[idx].didConnect
newSrv.lastErr = nc.srvPool[idx].lastErr
}
}
nc.srvPool = newPool
nc.urls = newURLs
// Update nc.current to point to the corresponding server in the new pool
// This is important because currentServer() uses pointer equality
if nc.current != nil {
currentURL := nc.current.URL.String()
found := false
for _, s := range newPool {
if s.URL.String() == currentURL {
// Update nc.current to point to the server instance in the new pool
nc.current = s
found = true
break
}
}
if !found && len(newPool) > 0 {
// Current server not in new pool - point to first server in new pool
// This ensures selectNextServer() can find it and properly rotate
nc.current = newPool[0]
}
}
return nil
}
// GetClientIP returns the client IP as known by the server.
// Supported as of server version 2.1.6.
func (nc *Conn) GetClientIP() (net.IP, error) {
@@ -6377,8 +6133,8 @@ func (nc *Conn) RemoveStatusListener(ch chan (Status)) {
}
for _, listeners := range nc.statListeners {
for range listeners {
delete(listeners, ch)
for l := range listeners {
delete(listeners, l)
}
}
}

4
vendor/modules.txt vendored
View File

@@ -1180,8 +1180,8 @@ github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
github.com/nats-io/nats-server/v2/server/thw
github.com/nats-io/nats-server/v2/server/tpm
# github.com/nats-io/nats.go v1.49.0
## explicit; go 1.24.0
# github.com/nats-io/nats.go v1.48.0
## explicit; go 1.23.0
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin
github.com/nats-io/nats.go/internal/parser