Merge branch 'origin/main' into 'next-release/main'

This commit is contained in:
oauth
2026-01-15 08:49:17 +00:00
13 changed files with 115 additions and 23 deletions

2
go.mod
View File

@@ -55,7 +55,7 @@ require (
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.12.3
github.com/nats-io/nats.go v1.47.0
github.com/nats-io/nats.go v1.48.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.1.2
github.com/onsi/ginkgo v1.16.5

4
go.sum
View File

@@ -916,8 +916,8 @@ github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.3 h1:KRv+1n7lddMVgkJPQer+pt36TcO0ENxjilBmeWdjcHs=
github.com/nats-io/nats-server/v2 v2.12.3/go.mod h1:MQXjG9WjyXKz9koWzUc3jYUMKD8x3CLmTNy91IQQz3Y=
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

View File

@@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
go get github.com/nats-io/nats.go@latest
# To get a specific version:
go get github.com/nats-io/nats.go@v1.47.0
go get github.com/nats-io/nats.go@v1.48.0
# Note that the latest major version for NATS Server is v2:
go get github.com/nats-io/nats-server/v2@latest

View File

@@ -95,7 +95,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat
s.AutoUnsubscribe(1)
defer s.Unsubscribe()
err = nc.publish(subj, inbox, hdr, data)
err = nc.publish(subj, inbox, false, hdr, data)
if err != nil {
return nil, err
}

View File

@@ -107,7 +107,7 @@ func (c *EncodedConn) Publish(subject string, v any) error {
if err != nil {
return err
}
return c.Conn.publish(subject, _EMPTY_, nil, b)
return c.Conn.publish(subject, _EMPTY_, false, nil, b)
}
// PublishRequest will perform a Publish() expecting a response on the
@@ -120,7 +120,7 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v any) error {
if err != nil {
return err
}
return c.Conn.publish(subject, reply, nil, b)
return c.Conn.publish(subject, reply, true, nil, b)
}
// Request will create an Inbox and perform a Request() call

View File

@@ -568,7 +568,6 @@ iter, _ := cons.Messages(jetstream.PullMaxMessages(10), jetstream.PullMaxBytes(1
request. If the value is set too low, the consumer will stall and not be able
to consume messages.
- `PullExpiry(time.Duration)` - timeout on a single pull request to the server
type PullThresholdMessages int
- `PullThresholdMessages(int)` - amount of messages which triggers refilling the
buffer
- `PullThresholdBytes(int)` - amount of bytes which triggers refilling the

View File

@@ -67,6 +67,11 @@ type (
// without additional checks. After the channel is closed,
// MessageBatch.Error() should be checked to see if there was an error
// during message delivery (e.g. missing heartbeat).
//
// NOTE: Fetch has worse performance when used to continuously retrieve
// messages in comparison to Messages or Consume methods, as it does not
// perform any optimizations (e.g. overlapping pull requests) and new
// subscription is created for each execution.
Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
// FetchBytes is used to retrieve up to a provided bytes from the
@@ -88,6 +93,11 @@ type (
// without additional checks. After the channel is closed,
// MessageBatch.Error() should be checked to see if there was an error
// during message delivery (e.g. missing heartbeat).
//
// NOTE: FetchBytes has worse performance when used to continuously
// retrieve messages in comparison to Messages or Consume methods, as it
// does not perform any optimizations (e.g. overlapping pull requests)
// and new subscription is created for each execution.
FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error)
// FetchNoWait is used to retrieve up to a provided number of messages
@@ -102,6 +112,11 @@ type (
// without additional checks. After the channel is closed,
// MessageBatch.Error() should be checked to see if there was an error
// during message delivery (e.g. missing heartbeat).
//
// NOTE: FetchNoWait has worse performance when used to continuously
// retrieve messages in comparison to Messages or Consume methods, as it
// does not perform any optimizations (e.g. overlapping pull requests)
// and new subscription is created for each execution.
FetchNoWait(batch int) (MessageBatch, error)
// Consume will continuously receive messages and handle them

View File

@@ -246,6 +246,11 @@ type (
Mirror *StreamSource `json:"mirror,omitempty"`
// Sources defines the configuration for sources of a KeyValue store.
// If no subject transforms are defined, it is assumed that a source is
// also a KV store and subject transforms will be set to correctly map
// keys from the source KV to the current one. If subject transforms are
// defined, they will be used as is. This allows using non-kv streams as
// sources.
Sources []*StreamSource `json:"sources,omitempty"`
// Compression sets the underlying stream compression.
@@ -471,7 +476,6 @@ const (
kvSubjectsTmpl = "$KV.%s.>"
kvSubjectsPreTmpl = "$KV.%s."
kvSubjectsPreDomainTmpl = "%s.$KV.%s."
kvNoPending = "0"
)
const (
@@ -685,8 +689,14 @@ func (js *jetStream) prepareKeyValueConfig(ctx context.Context, cfg KeyValueConf
scfg.Mirror = m
scfg.MirrorDirect = true
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
// if subject transforms are already set, then use as is.
// this allows for full control of the source, e.g. using non-KV streams.
// Note that in this case, the Name is not modified and full stream name must be provided.
if len(ss.SubjectTransforms) > 0 {
scfg.Sources = append(scfg.Sources, ss)
continue
}
var sourceBucketName string
if strings.HasPrefix(ss.Name, kvBucketNamePre) {
sourceBucketName = ss.Name[len(kvBucketNamePre):]
@@ -1291,6 +1301,8 @@ func (kv *kvs) WatchFiltered(ctx context.Context, keys []string, opts ...WatchOp
return nil, err
}
sub.SetClosedHandler(func(_ string) {
w.mu.Lock()
defer w.mu.Unlock()
close(w.updates)
})
// If there were no pending messages at the time of the creation

View File

@@ -105,7 +105,11 @@ func (p *pushConsumer) Consume(handler MessageHandler, opts ...PushConsumeOpt) (
}
var err error
sub.subscription, err = p.js.conn.Subscribe(p.info.Config.DeliverSubject, internalHandler)
if p.info.Config.DeliverGroup != "" {
sub.subscription, err = p.js.conn.QueueSubscribe(p.info.Config.DeliverSubject, p.info.Config.DeliverGroup, internalHandler)
} else {
sub.subscription, err = p.js.conn.Subscribe(p.info.Config.DeliverSubject, internalHandler)
}
if err != nil {
return nil, err
}

View File

@@ -1132,7 +1132,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if err != nil {
return nil, err
}
if err := js.nc.publish(m.Subject, reply, hdr, m.Data); err != nil {
if err := js.nc.publish(m.Subject, reply, false, hdr, m.Data); err != nil {
js.clearPAF(id)
return nil, err
}
@@ -3560,7 +3560,7 @@ func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byt
}
if js.opts.shouldTrace {
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
if ctrace.ResponseReceived != nil {
ctrace.ResponseReceived(subj, resp.Data, resp.Header)
}
}

View File

@@ -354,7 +354,6 @@ const (
kvSubjectsTmpl = "$KV.%s.>"
kvSubjectsPreTmpl = "$KV.%s."
kvSubjectsPreDomainTmpl = "%s.$KV.%s."
kvNoPending = "0"
)
// Regex for valid keys and buckets.

View File

@@ -48,7 +48,7 @@ import (
// Default Constants
const (
Version = "1.47.0"
Version = "1.48.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@@ -534,6 +534,11 @@ type Options struct {
// WebSocketConnectionHeadersHandler is an optional callback handler for generating token used for WebSocket connections.
WebSocketConnectionHeadersHandler WebSocketHeadersHandler
// SkipSubjectValidation will disable publish subject validation.
// NOTE: This is not recommended in general, as the performance gain is minimal
// and may lead to breaking protocol.
SkipSubjectValidation bool
}
const (
@@ -1512,6 +1517,20 @@ func WebSocketConnectionHeadersHandler(cb WebSocketHeadersHandler) Option {
}
}
// SkipSubjectValidation is an Option to skip subject validation when
// publishing messages.
// By default, subject validation is performed to ensure that subjects
// are valid according to NATS subject syntax (no spaces newlines and tabs).
// NOTE: It is not recommended to use this option as the performance gain
// is minimal and disabling subject validation can lead breaking protocol
// rules.
func SkipSubjectValidation() Option {
return func(o *Options) error {
o.SkipSubjectValidation = true
return nil
}
}
// Handler processing
// SetDisconnectHandler will set the disconnect event handler.
@@ -3916,7 +3935,7 @@ func (nc *Conn) kickFlusher() {
// argument is left untouched and needs to be correctly interpreted on
// the receiver.
func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, nil, data)
return nc.publish(subj, _EMPTY_, false, nil, data)
}
// Header represents the optional Header for a NATS message,
@@ -4059,27 +4078,71 @@ func (nc *Conn) PublishMsg(m *Msg) error {
if err != nil {
return err
}
return nc.publish(m.Subject, m.Reply, hdr, m.Data)
validateReply := m.Reply != _EMPTY_
return nc.publish(m.Subject, m.Reply, validateReply, hdr, m.Data)
}
// PublishRequest will perform a Publish() expecting a response on the
// reply subject. Use Request() for automatically waiting for a response
// inline.
func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
return nc.publish(subj, reply, nil, data)
return nc.publish(subj, reply, true, nil, data)
}
// Used for handrolled Itoa
const digits = "0123456789"
// validateSubject checks if the subject contains characters that break the NATS protocol.
// Uses an adaptive algorithm: manual loop for short subjects (< 16 chars) and
// SIMD-optimized strings.IndexByte for longer subjects.
func validateSubject(subj string) error {
if subj == "" {
return ErrBadSubject
}
// Adaptive threshold based on benchmark data showing crossover at ~15-20 characters.
const lengthThreshold = 16
if len(subj) < lengthThreshold {
// Fast path for short subjects (< 16 chars)
// Short-circuit on non-control characters.
for i := range len(subj) {
c := subj[i]
if c <= ' ' && (c == ' ' || c == '\t' || c == '\r' || c == '\n') {
return ErrBadSubject
}
}
return nil
}
// Optimized path for long subjects (>= 16 chars)
// Uses SIMD-optimized strings.IndexByte (processes 16+ bytes per instruction)
if strings.IndexByte(subj, ' ') >= 0 ||
strings.IndexByte(subj, '\t') >= 0 ||
strings.IndexByte(subj, '\r') >= 0 ||
strings.IndexByte(subj, '\n') >= 0 {
return ErrBadSubject
}
return nil
}
// publish is the internal function to publish messages to a nats-server.
// Sends a protocol data message by queuing into the bufio writer
// and kicking the flush go routine. These writes should be protected.
func (nc *Conn) publish(subj, reply string, hdr, data []byte) error {
func (nc *Conn) publish(subj, reply string, validateReply bool, hdr, data []byte) error {
if nc == nil {
return ErrInvalidConnection
}
if subj == "" {
if !nc.Opts.SkipSubjectValidation {
if err := validateSubject(subj); err != nil {
return err
}
if validateReply {
if err := validateSubject(reply); err != nil {
return ErrBadSubject
}
}
} else if subj == _EMPTY_ {
return ErrBadSubject
}
nc.mu.Lock()
@@ -4245,7 +4308,7 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
}
nc.mu.Unlock()
if err := nc.publish(subj, respInbox, hdr, data); err != nil {
if err := nc.publish(subj, respInbox, false, hdr, data); err != nil {
return nil, token, err
}
@@ -4341,7 +4404,7 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration)
s.AutoUnsubscribe(1)
defer s.Unsubscribe()
err = nc.publish(subj, inbox, hdr, data)
err = nc.publish(subj, inbox, false, hdr, data)
if err != nil {
return nil, err
}

2
vendor/modules.txt vendored
View File

@@ -1167,7 +1167,7 @@ github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
github.com/nats-io/nats-server/v2/server/thw
github.com/nats-io/nats-server/v2/server/tpm
# github.com/nats-io/nats.go v1.47.0
# github.com/nats-io/nats.go v1.48.0
## explicit; go 1.23.0
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin