Merge pull request #9970 from owncloud/dependabot/go_modules/github.com/nats-io/nats-server/v2-2.10.20

Bump github.com/nats-io/nats-server/v2 from 2.10.18 to 2.10.20
This commit is contained in:
Michael Barz
2024-09-03 12:24:20 +02:00
committed by GitHub
34 changed files with 750 additions and 544 deletions

4
go.mod
View File

@@ -62,7 +62,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.2.1
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.18
github.com/nats-io/nats-server/v2 v2.10.20
github.com/nats-io/nats.go v1.37.0
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
@@ -344,7 +344,7 @@ require (
go.uber.org/zap v1.23.0 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect

8
go.sum
View File

@@ -893,8 +893,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.18 h1:tRdZmBuWKVAFYtayqlBB2BuCHNGAQPvoQIXOKwU3WSM=
github.com/nats-io/nats-server/v2 v2.10.18/go.mod h1:97Qyg7YydD8blKlR8yBsUlPlWyZKjA7Bp5cl3MUE9K8=
github.com/nats-io/nats-server/v2 v2.10.20 h1:CXDTYNHeBiAKBTAIP2gjpgbWap2GhATnTLgP8etyvEI=
github.com/nats-io/nats-server/v2 v2.10.20/go.mod h1:hgcPnoUtMfxz1qVOvLZGurVypQ+Cg6GXVXjG53iHk+M=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
@@ -1549,8 +1549,8 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -15,6 +15,7 @@ package server
import (
"bytes"
"cmp"
"encoding/hex"
"errors"
"fmt"
@@ -25,7 +26,7 @@ import (
"net/http"
"net/textproto"
"reflect"
"sort"
"slices"
"strconv"
"strings"
"sync"
@@ -349,10 +350,8 @@ func (a *Account) updateRemoteServer(m *AccountNumConns) []*client {
// conservative and bit harsh here. Clients will reconnect if we over compensate.
var clients []*client
if mtce {
clients := a.getClientsLocked()
sort.Slice(clients, func(i, j int) bool {
return clients[i].start.After(clients[j].start)
})
clients = a.getClientsLocked()
slices.SortFunc(clients, func(i, j *client) int { return -i.start.Compare(j.start) }) // reserve
over := (len(a.clients) - int(a.sysclients) + int(a.nrclients)) - int(a.mconns)
if over < len(clients) {
clients = clients[:over]
@@ -669,7 +668,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
}
dests = append(dests, &destination{tr, aw})
}
sort.Slice(dests, func(i, j int) bool { return dests[i].weight < dests[j].weight })
slices.SortFunc(dests, func(i, j *destination) int { return cmp.Compare(i.weight, j.weight) })
var lw uint8
for _, d := range dests {
@@ -1478,11 +1477,13 @@ func (a *Account) addServiceImportWithClaim(destination *Account, from, to strin
}
// Check if this introduces a cycle before proceeding.
if err := a.serviceImportFormsCycle(destination, from); err != nil {
return err
// From will be the mapped subject.
// If the 'to' has a wildcard make sure we pre-transform the 'from' before we check for cycles, e.g. '$1'
fromT := from
if subjectHasWildcard(to) {
fromT, _ = transformUntokenize(from)
}
if err := a.serviceImportFormsCycle(destination, to); err != nil {
if err := a.serviceImportFormsCycle(destination, fromT); err != nil {
return err
}
@@ -1807,7 +1808,7 @@ func (a *Account) _checkForReverseEntry(reply string, si *serviceImport, checkIn
// Note that if we are here reply has to be a literal subject.
if checkInterest {
// If interest still exists we can not clean these up yet.
if rr := a.sl.Match(reply); len(rr.psubs)+len(rr.qsubs) > 0 {
if a.sl.HasInterest(reply) {
a.mu.RUnlock()
return
}
@@ -1925,6 +1926,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
tr *subjectTransform
err error
)
if subjectHasWildcard(to) {
// If to and from match, then we use the published subject.
if to == from {
@@ -3564,9 +3566,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
// Sort if we are over the limit.
if a.MaxTotalConnectionsReached() {
sort.Slice(clients, func(i, j int) bool {
return clients[i].start.After(clients[j].start)
})
slices.SortFunc(clients, func(i, j *client) int { return -i.start.Compare(j.start) }) // sort in reverse order
}
// If JetStream is enabled for this server we will call into configJetStream for the account
@@ -3774,7 +3774,7 @@ func fetchAccount(res AccountResolver, name string) (string, error) {
if !nkeys.IsValidPublicAccountKey(name) {
return _EMPTY_, fmt.Errorf("will only fetch valid account keys")
}
return res.Fetch(name)
return res.Fetch(copyString(name))
}
// AccountResolver interface. This is to fetch Account JWTs by public nkeys

View File

@@ -857,33 +857,6 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
// If we have a jwt and a userClaim, make sure we have the Account, etc associated.
// We need to look up the account. This will use an account resolver if one is present.
if juc != nil {
allowedConnTypes, err := convertAllowedConnectionTypes(juc.AllowedConnectionTypes)
if err != nil {
// We got an error, which means some connection types were unknown. As long as
// a valid one is returned, we proceed with auth. If not, we have to reject.
// In other words, suppose that JWT allows "WEBSOCKET" in the array. No error
// is returned and allowedConnTypes will contain "WEBSOCKET" only.
// Client will be rejected if not a websocket client, or proceed with rest of
// auth if it is.
// Now suppose JWT allows "WEBSOCKET, MQTT" and say MQTT is not known by this
// server. In this case, allowedConnTypes would contain "WEBSOCKET" and we
// would get `err` indicating that "MQTT" is an unknown connection type.
// If a websocket client connects, it should still be allowed, since after all
// the admin wanted to allow websocket and mqtt connection types.
// However, say that the JWT only allows "MQTT" (and again suppose this server
// does not know about MQTT connection type), then since the allowedConnTypes
// map would be empty (no valid types found), and since empty means allow-all,
// then we should reject because the intent was to allow connections for this
// user only as an MQTT client.
c.Debugf("%v", err)
if len(allowedConnTypes) == 0 {
return false
}
}
if !c.connectionTypeAllowed(allowedConnTypes) {
c.Debugf("Connection type not allowed")
return false
}
issuer := juc.Issuer
if juc.IssuerAccount != _EMPTY_ {
issuer = juc.IssuerAccount
@@ -926,6 +899,36 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
c.Debugf("Account does not allow bearer tokens")
return false
}
// We check the allowed connection types, but only after processing
// of scoped signer (so that it updates `juc` with what is defined
// in the account.
allowedConnTypes, err := convertAllowedConnectionTypes(juc.AllowedConnectionTypes)
if err != nil {
// We got an error, which means some connection types were unknown. As long as
// a valid one is returned, we proceed with auth. If not, we have to reject.
// In other words, suppose that JWT allows "WEBSOCKET" in the array. No error
// is returned and allowedConnTypes will contain "WEBSOCKET" only.
// Client will be rejected if not a websocket client, or proceed with rest of
// auth if it is.
// Now suppose JWT allows "WEBSOCKET, MQTT" and say MQTT is not known by this
// server. In this case, allowedConnTypes would contain "WEBSOCKET" and we
// would get `err` indicating that "MQTT" is an unknown connection type.
// If a websocket client connects, it should still be allowed, since after all
// the admin wanted to allow websocket and mqtt connection types.
// However, say that the JWT only allows "MQTT" (and again suppose this server
// does not know about MQTT connection type), then since the allowedConnTypes
// map would be empty (no valid types found), and since empty means allow-all,
// then we should reject because the intent was to allow connections for this
// user only as an MQTT client.
c.Debugf("%v", err)
if len(allowedConnTypes) == 0 {
return false
}
}
if !c.connectionTypeAllowed(allowedConnTypes) {
c.Debugf("Connection type not allowed")
return false
}
// skip validation of nonce when presented with a bearer token
// FIXME: if BearerToken is only for WSS, need check for server with that port enabled
if !juc.BearerToken {
@@ -978,12 +981,12 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
deniedSub := []string{}
for _, sub := range denyAllJs {
if c.perms.pub.deny != nil {
if r := c.perms.pub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 {
if c.perms.pub.deny.HasInterest(sub) {
deniedPub = append(deniedPub, sub)
}
}
if c.perms.sub.deny != nil {
if r := c.perms.sub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 {
if c.perms.sub.deny.HasInterest(sub) {
deniedSub = append(deniedSub, sub)
}
}

View File

@@ -14,10 +14,11 @@
package avl
import (
"cmp"
"encoding/binary"
"errors"
"math/bits"
"sort"
"slices"
)
// SequenceSet is a memory and encoding optimized set for storing unsigned ints.
@@ -209,7 +210,7 @@ func Union(ssa ...*SequenceSet) *SequenceSet {
return nil
}
// Sort so we can clone largest.
sort.Slice(ssa, func(i, j int) bool { return ssa[i].Size() > ssa[j].Size() })
slices.SortFunc(ssa, func(i, j *SequenceSet) int { return -cmp.Compare(i.Size(), j.Size()) }) // reverse order
ss := ssa[0].Clone()
// Insert the rest through range call.

View File

@@ -1,4 +1,4 @@
// Copyright 2023 The NATS Authors
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -222,7 +222,7 @@ func CertOCSPEligible(link *ChainLink) bool {
if link == nil || link.Leaf.Raw == nil || len(link.Leaf.Raw) == 0 {
return false
}
if link.Leaf.OCSPServer == nil || len(link.Leaf.OCSPServer) == 0 {
if len(link.Leaf.OCSPServer) == 0 {
return false
}
urls := getWebEndpoints(link.Leaf.OCSPServer)

View File

@@ -1,4 +1,4 @@
// Copyright 2023 The NATS Authors
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -15,6 +15,7 @@ package certidp
import (
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
@@ -26,7 +27,7 @@ import (
func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte, error) {
if link == nil || link.Leaf == nil || link.Issuer == nil || opts == nil || log == nil {
return nil, fmt.Errorf(ErrInvalidChainlink)
return nil, errors.New(ErrInvalidChainlink)
}
timeout := time.Duration(opts.Timeout * float64(time.Second))
@@ -59,7 +60,7 @@ func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte,
responders := *link.OCSPWebEndpoints
if len(responders) == 0 {
return nil, fmt.Errorf(ErrNoAvailOCSPServers)
return nil, errors.New(ErrNoAvailOCSPServers)
}
var raw []byte

View File

@@ -115,19 +115,38 @@ var (
winMyStore = winWide("MY")
// These DLLs must be available on all Windows hosts
winCrypt32 = windows.MustLoadDLL("crypt32.dll")
winNCrypt = windows.MustLoadDLL("ncrypt.dll")
winCrypt32 = windows.NewLazySystemDLL("crypt32.dll")
winNCrypt = windows.NewLazySystemDLL("ncrypt.dll")
winCertFindCertificateInStore = winCrypt32.MustFindProc("CertFindCertificateInStore")
winCryptAcquireCertificatePrivateKey = winCrypt32.MustFindProc("CryptAcquireCertificatePrivateKey")
winNCryptExportKey = winNCrypt.MustFindProc("NCryptExportKey")
winNCryptOpenStorageProvider = winNCrypt.MustFindProc("NCryptOpenStorageProvider")
winNCryptGetProperty = winNCrypt.MustFindProc("NCryptGetProperty")
winNCryptSignHash = winNCrypt.MustFindProc("NCryptSignHash")
winCertFindCertificateInStore = winCrypt32.NewProc("CertFindCertificateInStore")
winCryptAcquireCertificatePrivateKey = winCrypt32.NewProc("CryptAcquireCertificatePrivateKey")
winNCryptExportKey = winNCrypt.NewProc("NCryptExportKey")
winNCryptOpenStorageProvider = winNCrypt.NewProc("NCryptOpenStorageProvider")
winNCryptGetProperty = winNCrypt.NewProc("NCryptGetProperty")
winNCryptSignHash = winNCrypt.NewProc("NCryptSignHash")
winFnGetProperty = winGetProperty
)
func init() {
for _, d := range []*windows.LazyDLL{
winCrypt32, winNCrypt,
} {
if err := d.Load(); err != nil {
panic(err)
}
}
for _, p := range []*windows.LazyProc{
winCertFindCertificateInStore, winCryptAcquireCertificatePrivateKey,
winNCryptExportKey, winNCryptOpenStorageProvider,
winNCryptGetProperty, winNCryptSignHash,
} {
if err := p.Find(); err != nil {
panic(err)
}
}
}
type winPKCS1PaddingInfo struct {
pszAlgID *uint16
}

View File

@@ -312,6 +312,8 @@ type outbound struct {
cw *s2.Writer
}
const nbMaxVectorSize = 1024 // == IOV_MAX on Linux/Darwin and most other Unices (except Solaris/AIX)
const nbPoolSizeSmall = 512 // Underlying array size of small buffer
const nbPoolSizeMedium = 4096 // Underlying array size of medium buffer
const nbPoolSizeLarge = 65536 // Underlying array size of large buffer
@@ -1611,7 +1613,7 @@ func (c *client) flushOutbound() bool {
// referenced in c.out.nb (which can be modified in queueOutboud() while
// the lock is released).
c.out.wnb = append(c.out.wnb, collapsed...)
var _orig [1024][]byte
var _orig [nbMaxVectorSize][]byte
orig := append(_orig[:0], c.out.wnb...)
// Since WriteTo is lopping things off the beginning, we need to remember
@@ -1622,13 +1624,31 @@ func (c *client) flushOutbound() bool {
// flush here
start := time.Now()
// FIXME(dlc) - writev will do multiple IOs past 1024 on
// most platforms, need to account for that with deadline?
nc.SetWriteDeadline(start.Add(wdl))
var n int64 // Total bytes written
var wn int64 // Bytes written per loop
var err error // Error from last write, if any
for len(c.out.wnb) > 0 {
// Limit the number of vectors to no more than nbMaxVectorSize,
// which if 1024, will mean a maximum of 64MB in one go.
wnb := c.out.wnb
if len(wnb) > nbMaxVectorSize {
wnb = wnb[:nbMaxVectorSize]
}
consumed := len(wnb)
// Actual write to the socket.
n, err := c.out.wnb.WriteTo(nc)
nc.SetWriteDeadline(time.Time{})
// Actual write to the socket.
nc.SetWriteDeadline(start.Add(wdl))
wn, err = wnb.WriteTo(nc)
nc.SetWriteDeadline(time.Time{})
// Update accounting, move wnb slice onwards if needed, or stop
// if a write error was reported that wasn't a short write.
n += wn
c.out.wnb = c.out.wnb[consumed-len(wnb):]
if err != nil && err != io.ErrShortWrite {
break
}
}
lft := time.Since(start)
@@ -1810,7 +1830,9 @@ func (c *client) markConnAsClosed(reason ClosedState) {
if nc := c.nc; nc != nil && c.srv != nil {
// TODO: May want to send events to single go routine instead
// of creating a new go routine for each save.
go c.srv.saveClosedClient(c, nc, reason)
// Pass the c.subs as a reference. It may be set to nil in
// closeConnection.
go c.srv.saveClosedClient(c, nc, c.subs, reason)
}
}
// If writeLoop exists, let it do the final flush, close and teardown.
@@ -2950,7 +2972,7 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri
if err := im.acc.sl.Insert(&nsub); err != nil {
errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name)
c.Debugf(errs)
return nil, fmt.Errorf(errs)
return nil, errors.New(errs)
}
// Update our route map here. But only if we are not a leaf node or a hub leafnode.
@@ -3886,9 +3908,8 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
// Match may use the subject here to populate a cache, so can not use bytesToString here.
r = acc.sl.Match(string(c.pa.subject))
if len(r.psubs)+len(r.qsubs) > 0 {
c.in.results[string(c.pa.subject)] = r
// Prune the results cache. Keeps us from unbounded growth. Random delete.
if len(c.in.results) > maxResultCacheSize {
if len(c.in.results) >= maxResultCacheSize {
n := 0
for subject := range c.in.results {
delete(c.in.results, subject)
@@ -3897,6 +3918,8 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
}
}
}
// Then add the new cache entry.
c.in.results[string(c.pa.subject)] = r
}
}
@@ -3964,7 +3987,7 @@ func (c *client) subForReply(reply []byte) *subscription {
func (c *client) handleGWReplyMap(msg []byte) bool {
// Check for leaf nodes
if c.srv.gwLeafSubs.Count() > 0 {
if r := c.srv.gwLeafSubs.Match(string(c.pa.subject)); len(r.psubs) > 0 {
if r := c.srv.gwLeafSubs.MatchBytes(c.pa.subject); len(r.psubs) > 0 {
c.processMsgResults(c.acc, r, msg, c.pa.deliver, c.pa.subject, c.pa.reply, pmrNoFlag)
}
}
@@ -5284,6 +5307,14 @@ func (c *client) closeConnection(reason ClosedState) {
}
}
// Now that we are done with subscriptions, clear the field so that the
// connection can be released and gc'ed.
if kind == CLIENT || kind == LEAF {
c.mu.Lock()
c.subs = nil
c.mu.Unlock()
}
// Don't reconnect connections that have been marked with
// the no reconnect flag.
if noReconnect {
@@ -5436,17 +5467,19 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {
if !ok {
if c.kind == ROUTER && len(c.route.accName) > 0 {
acc = c.acc
if acc = c.acc; acc == nil {
return nil, nil
}
} else {
// Match correct account and sublist.
if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil {
if acc, _ = c.srv.LookupAccount(bytesToString(c.pa.account)); acc == nil {
return nil, nil
}
}
sl := acc.sl
// Match against the account sublist.
r = sl.Match(string(c.pa.subject))
r = sl.MatchBytes(c.pa.subject)
// Check if we need to prune.
if len(c.in.pacache) >= maxPerAccountCacheSize {
@@ -5824,6 +5857,14 @@ func (c *client) Warnf(format string, v ...any) {
c.srv.Warnf(format, v...)
}
func (c *client) rateLimitFormatWarnf(format string, v ...any) {
if _, loaded := c.srv.rateLimitLogging.LoadOrStore(format, time.Now()); loaded {
return
}
statement := fmt.Sprintf(format, v...)
c.Warnf("%s", statement)
}
func (c *client) RateLimitWarnf(format string, v ...any) {
// Do the check before adding the client info to the format...
statement := fmt.Sprintf(format, v...)

View File

@@ -55,7 +55,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.10.18"
VERSION = "2.10.20"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -21,7 +21,7 @@ import (
"fmt"
"math/rand"
"reflect"
"sort"
"slices"
"strconv"
"strings"
"sync"
@@ -712,7 +712,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
mset.mu.RLock()
s, jsa, cfg, acc := mset.srv, mset.jsa, mset.cfg, mset.acc
retention := cfg.Retention
mset.mu.RUnlock()
// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
@@ -735,7 +734,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Make sure we have sane defaults. Do so with the JS lock, otherwise a
// badly timed meta snapshot can result in a race condition.
mset.js.mu.Lock()
setConsumerConfigDefaults(config, &mset.cfg, srvLim, selectedLimits)
setConsumerConfigDefaults(config, &cfg, srvLim, selectedLimits)
mset.js.mu.Unlock()
if err := checkConsumerCfg(config, srvLim, &cfg, acc, selectedLimits, isRecovering); err != nil {
@@ -781,7 +780,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, NewJSConsumerAlreadyExistsError()
}
// Check for overlapping subjects if we are a workqueue
if mset.cfg.Retention == WorkQueuePolicy {
if cfg.Retention == WorkQueuePolicy {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if !mset.partitionUnique(cName, subjects) {
return nil, NewJSConsumerWQConsumerNotUniqueError()
@@ -803,7 +802,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// but if not we use the value from account limits, if account limits is more restrictive
// than stream config we prefer the account limits to handle cases where account limits are
// updated during the lifecycle of the stream
maxc := mset.cfg.MaxConsumers
maxc := cfg.MaxConsumers
if maxc <= 0 || (selectedLimits.MaxConsumers > 0 && selectedLimits.MaxConsumers < maxc) {
maxc = selectedLimits.MaxConsumers
}
@@ -813,7 +812,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
// Check on stream type conflicts with WorkQueues.
if mset.cfg.Retention == WorkQueuePolicy && !config.Direct {
if cfg.Retention == WorkQueuePolicy && !config.Direct {
// Force explicit acks here.
if config.AckPolicy != AckExplicit {
mset.mu.Unlock()
@@ -871,7 +870,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
sfreq: int32(sampleFreq),
maxdc: uint64(config.MaxDeliver),
maxp: config.MaxAckPending,
retention: retention,
retention: cfg.Retention,
created: time.Now().UTC(),
}
@@ -904,17 +903,17 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
}
// Create ackMsgs queue now that we have a consumer name
o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name))
o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, cfg.Name))
// Create our request waiting queue.
if o.isPullMode() {
o.waiting = newWaitQueue(config.MaxWaiting)
// Create our internal queue for next msg requests.
o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, mset.cfg.Name))
o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, cfg.Name))
}
// already under lock, mset.Name() would deadlock
o.stream = mset.cfg.Name
o.stream = cfg.Name
o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name
o.nakEventT = JSAdvisoryConsumerMsgNakPre + "." + o.stream + "." + o.name
o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name
@@ -999,7 +998,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// that to scanf them back in.
// Escape '%' in consumer and stream names, as `pre` is used as a template later
// in consumer.ackReply(), resulting in erroneous formatting of the ack subject.
mn := strings.ReplaceAll(mset.cfg.Name, "%", "%%")
mn := strings.ReplaceAll(cfg.Name, "%", "%%")
pre := fmt.Sprintf(jsAckT, mn, strings.ReplaceAll(o.name, "%", "%%"))
o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre)
o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre)
@@ -1011,9 +1010,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if o.isPushMode() {
// Check if we are running only 1 replica and that the delivery subject has interest.
// Check in place here for interest. Will setup properly in setLeader.
if config.replicas(&mset.cfg) == 1 {
r := o.acc.sl.Match(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) {
if config.replicas(&cfg) == 1 {
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(interest) {
// Let the interest come to us eventually, but setup delete timer.
o.updateDeliveryInterest(false)
}
@@ -1193,7 +1192,7 @@ func (o *consumer) setLeader(isLeader bool) {
}
mset.mu.RLock()
s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq
s, jsa, stream, lseq := mset.srv, mset.jsa, mset.getCfgName(), mset.lseq
mset.mu.RUnlock()
o.mu.Lock()
@@ -1508,7 +1507,7 @@ func (s *Server) hasGatewayInterest(account, subject string) bool {
gw.RLock()
defer gw.RUnlock()
for _, gwc := range gw.outo {
psi, qr := gwc.gatewayInterest(account, subject)
psi, qr := gwc.gatewayInterest(account, stringToBytes(subject))
if psi || qr != nil {
return true
}
@@ -1697,7 +1696,7 @@ func (o *consumer) forceExpirePending() {
}
}
if len(expired) > 0 {
sort.Slice(expired, func(i, j int) bool { return expired[i] < expired[j] })
slices.Sort(expired)
o.addToRedeliverQueue(expired...)
// Now we should update the timestamp here since we are redelivering.
// We will use an incrementing time to preserve order for any other redelivery.
@@ -1745,6 +1744,8 @@ func (o *consumer) setRateLimit(bps uint64) {
// Burst should be set to maximum msg size for this account, etc.
var burst int
// We don't need to get cfgMu's rlock here since this function
// is already invoked under mset.mu.RLock(), which superseeds cfgMu.
if mset.cfg.MaxMsgSize > 0 {
burst = int(mset.cfg.MaxMsgSize)
} else if mset.jsa.account.limits.mpay > 0 {
@@ -1802,8 +1803,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
if ncfg.DeliverSubject == _EMPTY_ {
return errors.New("can not update push consumer to pull based")
}
rr := acc.sl.Match(cfg.DeliverSubject)
if len(rr.psubs)+len(rr.qsubs) != 0 {
if acc.sl.HasInterest(cfg.DeliverSubject) {
return NewJSConsumerNameExistError()
}
}
@@ -2028,9 +2028,10 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
switch {
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
o.processAckMsg(sseq, dseq, dc, reply, true)
// We handle replies for acks in updateAcks
skipAckReply = true
if !o.processAckMsg(sseq, dseq, dc, reply, true) {
// We handle replies for acks in updateAcks
skipAckReply = true
}
case bytes.HasPrefix(msg, AckNext):
o.processAckMsg(sseq, dseq, dc, _EMPTY_, true)
o.processNextMsgRequest(reply, msg[len(AckNext):])
@@ -2044,9 +2045,10 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
if buf := msg[len(AckTerm):]; len(buf) > 0 {
reason = string(bytes.TrimSpace(buf))
}
o.processTerm(sseq, dseq, dc, reason, reply)
// We handle replies for acks in updateAcks
skipAckReply = true
if !o.processTerm(sseq, dseq, dc, reason, reply) {
// We handle replies for acks in updateAcks
skipAckReply = true
}
}
// Ack the ack if requested.
@@ -2396,9 +2398,12 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
}
// Process a TERM
func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) {
// Returns `true` if the ack was processed in place and the sender can now respond
// to the client, or `false` if there was an error or the ack is replicated (in which
// case the reply will be sent later).
func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool {
// Treat like an ack to suppress redelivery.
o.processAckMsg(sseq, dseq, dc, reply, false)
ackedInPlace := o.processAckMsg(sseq, dseq, dc, reply, false)
o.mu.Lock()
defer o.mu.Unlock()
@@ -2421,11 +2426,14 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) {
j, err := json.Marshal(e)
if err != nil {
return
// We had an error during the marshal, so we can't send the advisory,
// but we still need to tell the caller that the ack was processed.
return ackedInPlace
}
subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
return ackedInPlace
}
// Introduce a small delay in when timer fires to check pending.
@@ -2458,7 +2466,7 @@ func (o *consumer) checkRedelivered(slseq uint64) {
if shouldUpdateState {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.cfg.Name, name, err)
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.getCfgName(), name, err)
}
}
}
@@ -2734,11 +2742,15 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
o.sendAdvisory(o.ackEventT, j)
}
func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) {
// Process an ACK.
// Returns `true` if the ack was processed in place and the sender can now respond
// to the client, or `false` if there was an error or the ack is replicated (in which
// case the reply will be sent later).
func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) bool {
o.mu.Lock()
if o.closed {
o.mu.Unlock()
return
return false
}
// Check if this ack is above the current pointer to our next to deliver.
@@ -2750,9 +2762,14 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
mset := o.mset
if mset == nil || mset.closed.Load() {
o.mu.Unlock()
return
return false
}
// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered (o.node != nil) this will be handled by
// processReplicatedAck after the ack has propagated.
ackInPlace := o.node == nil && o.retention != LimitsPolicy
var sgap, floor uint64
var needSignal bool
@@ -2791,7 +2808,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
// no-op
if dseq <= o.adflr || sseq <= o.asflr {
o.mu.Unlock()
return
return ackInPlace
}
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
@@ -2823,22 +2840,19 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
case AckNone:
// FIXME(dlc) - This is error but do we care?
o.mu.Unlock()
return
return ackInPlace
}
// No ack replication, so we set reply to "" so that updateAcks does not
// send the reply. The caller will.
if ackInPlace {
reply = _EMPTY_
}
// Update underlying store.
o.updateAcks(dseq, sseq, reply)
// In case retention changes for a stream, this ought to have been updated
// using the consumer lock to avoid a race.
retention := o.retention
clustered := o.node != nil
o.mu.Unlock()
// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered this will be handled by processReplicatedAck
// after the ack has propagated.
if !clustered && mset != nil && retention != LimitsPolicy {
if ackInPlace {
if sgap > 1 {
// FIXME(dlc) - This can very inefficient, will need to fix.
for seq := sseq; seq >= floor; seq-- {
@@ -2853,6 +2867,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
if needSignal {
o.signalNewMessages()
}
return ackInPlace
}
// Determine if this is a truly filtered consumer. Modern clients will place filtered subjects
@@ -2869,16 +2884,21 @@ func (o *consumer) isFiltered() bool {
return true
}
// Protect access to mset.cfg with the cfgMu mutex.
mset.cfgMu.RLock()
msetSubjects := mset.cfg.Subjects
mset.cfgMu.RUnlock()
// `isFiltered` need to be performant, so we do
// as any checks as possible to avoid unnecessary work.
// Here we avoid iteration over slices if there is only one subject in stream
// and one filter for the consumer.
if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 {
return mset.cfg.Subjects[0] != o.subjf[0].subject
if len(msetSubjects) == 1 && len(o.subjf) == 1 {
return msetSubjects[0] != o.subjf[0].subject
}
// if the list is not equal length, we can return early, as this is filtered.
if len(mset.cfg.Subjects) != len(o.subjf) {
if len(msetSubjects) != len(o.subjf) {
return true
}
@@ -2890,7 +2910,7 @@ func (o *consumer) isFiltered() bool {
for _, val := range o.subjf {
cfilters[val.subject] = struct{}{}
}
for _, val := range mset.cfg.Subjects {
for _, val := range msetSubjects {
if _, ok := cfilters[val]; !ok {
return true
}
@@ -2898,6 +2918,28 @@ func (o *consumer) isFiltered() bool {
return false
}
// Check if we would have matched and needed an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) matchAck(sseq uint64) bool {
o.mu.RLock()
defer o.mu.RUnlock()
// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() {
if o.mset == nil {
return false
}
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
return false
}
if !o.isFilteredMatch(svp.subj) {
return false
}
}
return true
}
// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) needAck(sseq uint64, subj string) bool {
@@ -3199,8 +3241,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
if wr.expires.IsZero() || time.Now().Before(wr.expires) {
rr := wr.acc.sl.Match(wr.interest)
if len(rr.psubs)+len(rr.qsubs) > 0 {
if wr.acc.sl.HasInterest(wr.interest) {
return o.waiting.pop()
} else if time.Since(wr.received) < defaultGatewayRecentSubExpiration && (o.srv.leafNodeEnabled || o.srv.gateway.enabled) {
return o.waiting.pop()
@@ -3627,8 +3668,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
continue
}
// Now check interest.
rr := wr.acc.sl.Match(wr.interest)
interest := len(rr.psubs)+len(rr.qsubs) > 0
interest := wr.acc.sl.HasInterest(wr.interest)
if !interest && (s.leafNodeEnabled || s.gateway.enabled) {
// If we are here check on gateways and leaf nodes (as they can mask gateways on the other end).
// If we have interest or the request is too young break and do not expire.
@@ -3907,10 +3947,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
inch := o.inch
o.mu.Unlock()
// Grab the stream's retention policy
mset.mu.RLock()
rp := mset.cfg.Retention
mset.mu.RUnlock()
// Grab the stream's retention policy and name
mset.cfgMu.RLock()
stream, rp := mset.cfg.Name, mset.cfg.Retention
mset.cfgMu.RUnlock()
var err error
@@ -3966,12 +4006,12 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
goto waitForMsgs
} else if err == errPartialCache {
s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name)
o.mset.acc, stream, o.cfg.Name)
goto waitForMsgs
} else {
s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name, err)
o.mset.acc, stream, o.cfg.Name, err)
goto waitForMsgs
}
}
@@ -4622,7 +4662,7 @@ func (o *consumer) checkPending() {
if len(expired) > 0 {
// We need to sort.
sort.Slice(expired, func(i, j int) bool { return expired[i] < expired[j] })
slices.Sort(expired)
o.addToRedeliverQueue(expired...)
// Now we should update the timestamp here since we are redelivering.
// We will use an incrementing time to preserve order for any other redelivery.
@@ -4656,7 +4696,7 @@ func (o *consumer) checkPending() {
if shouldUpdateState {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.cfg.Name, name, err)
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.getCfgName(), name, err)
}
}
}
@@ -4777,7 +4817,10 @@ func (o *consumer) selectStartingSeqNo() {
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
// If our parent stream is set to max msgs per subject of 1 this is just
// a normal consumer at this point. We can avoid any heavy lifting.
if o.mset.cfg.MaxMsgsPer == 1 {
o.mset.cfgMu.RLock()
mmp := o.mset.cfg.MaxMsgsPer
o.mset.cfgMu.RUnlock()
if mmp == 1 {
o.sseq = state.FirstSeq
} else {
// A threshold for when we switch from get last msg to subjects state.
@@ -4807,9 +4850,7 @@ func (o *consumer) selectStartingSeqNo() {
}
// Sort the skip list if needed.
if len(lss.seqs) > 1 {
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
slices.Sort(lss.seqs)
}
if len(lss.seqs) == 0 {
o.sseq = state.LastSeq
@@ -4848,12 +4889,16 @@ func (o *consumer) selectStartingSeqNo() {
o.sseq = o.cfg.OptStartSeq
}
if state.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < state.FirstSeq {
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
// Only clip the sseq if the OptStartSeq is not provided, otherwise
// it's possible that the stream just doesn't contain OptStartSeq yet.
if o.cfg.OptStartSeq == 0 {
if state.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < state.FirstSeq {
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
}
}
@@ -4932,9 +4977,9 @@ func (o *consumer) isActive() bool {
// hasNoLocalInterest return true if we have no local interest.
func (o *consumer) hasNoLocalInterest() bool {
o.mu.RLock()
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
o.mu.RUnlock()
return len(rr.psubs)+len(rr.qsubs) == 0
return !interest
}
// This is when the underlying stream has been purged.
@@ -5168,6 +5213,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
if mset != nil {
mset.mu.Lock()
mset.removeConsumer(o)
// No need for cfgMu's lock since mset.mu.Lock superseeds it.
rp = mset.cfg.Retention
mset.mu.Unlock()
}
@@ -5298,13 +5344,13 @@ func (o *consumer) switchToEphemeral() {
o.mu.Lock()
o.cfg.Durable = _EMPTY_
store, ok := o.store.(*consumerFileStore)
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
// Setup dthresh.
o.updateInactiveThreshold(&o.cfg)
o.mu.Unlock()
// Update interest
o.updateDeliveryInterest(len(rr.psubs)+len(rr.qsubs) > 0)
o.updateDeliveryInterest(interest)
// Write out new config
if ok {
store.updateConfig(o.cfg)
@@ -5499,7 +5545,9 @@ func (o *consumer) checkStateForInterestStream() error {
}
for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
mset.ackMsg(o, seq)
if o.matchAck(seq) {
mset.ackMsg(o, seq)
}
}
o.mu.RLock()

View File

@@ -180,6 +180,9 @@ var (
// ErrClusterNameRemoteConflict signals that a remote server has a different cluster name.
ErrClusterNameRemoteConflict = errors.New("cluster name from remote server conflicts")
// ErrClusterNameHasSpaces signals that the cluster name contains spaces, which is not allowed.
ErrClusterNameHasSpaces = errors.New("cluster name cannot contain spaces or new lines")
// ErrMalformedSubject is returned when a subscription is made with a subject that does not conform to subject rules.
ErrMalformedSubject = errors.New("malformed subject")

View File

@@ -130,6 +130,7 @@ type internal struct {
replies map[string]msgHandler
sendq *ipQueue[*pubMsg]
recvq *ipQueue[*inSysMsg]
recvqp *ipQueue[*inSysMsg] // For STATSZ/Pings
resetCh chan struct{}
wg sync.WaitGroup
sq *sendq
@@ -412,15 +413,7 @@ type TypedEvent struct {
// internalReceiveLoop will be responsible for dispatching all messages that
// a server receives and needs to internally process, e.g. internal subs.
func (s *Server) internalReceiveLoop() {
s.mu.RLock()
if s.sys == nil || s.sys.recvq == nil {
s.mu.RUnlock()
return
}
recvq := s.sys.recvq
s.mu.RUnlock()
func (s *Server) internalReceiveLoop(recvq *ipQueue[*inSysMsg]) {
for s.eventsRunning() {
select {
case <-recvq.ch:
@@ -1139,7 +1132,7 @@ func (s *Server) initEventTracking() {
}
// Listen for ping messages that will be sent to all servers for statsz.
// This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil {
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallbackStatsz(s.statszReq)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
@@ -1182,23 +1175,42 @@ func (s *Server) initEventTracking() {
optz := &HealthzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.healthz(&optz.HealthzOptions), nil })
},
"PROFILEZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &ProfilezEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.profilez(&optz.ProfilezOptions), nil })
},
"PROFILEZ": nil, // Special case, see below
"EXPVARZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &ExpvarzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.expvarz(optz), nil })
},
}
profilez := func(_ *subscription, c *client, _ *Account, _, rply string, rmsg []byte) {
hdr, msg := c.msgParts(rmsg)
// Need to copy since we are passing those to the go routine below.
hdr, msg = copyBytes(hdr), copyBytes(msg)
// Execute in its own go routine because CPU profiling, for instance,
// could take several seconds to complete.
go func() {
optz := &ProfilezEventOptions{}
s.zReq(c, rply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) {
return s.profilez(&optz.ProfilezOptions), nil
})
}()
}
for name, req := range monSrvc {
var h msgHandler
switch name {
case "PROFILEZ":
h = profilez
case "STATSZ":
h = s.noInlineCallbackStatsz(req)
default:
h = s.noInlineCallback(req)
}
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
if _, err := s.sysSubscribe(subject, h); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
subject = fmt.Sprintf(serverPingReqSubj, name)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
if _, err := s.sysSubscribe(subject, h); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
@@ -2451,16 +2463,39 @@ func (s *Server) sendAccountAuthErrorEvent(c *client, acc *Account, reason strin
// rmsg contains header and the message. use client.msgParts(rmsg) to split them apart
type msgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, rmsg []byte)
const (
recvQMuxed = 1
recvQStatsz = 2
)
// Create a wrapped callback handler for the subscription that will move it to an
// internal recvQ for processing not inline with routes etc.
func (s *Server) noInlineCallback(cb sysMsgHandler) msgHandler {
return s.noInlineCallbackRecvQSelect(cb, recvQMuxed)
}
// Create a wrapped callback handler for the subscription that will move it to an
// internal recvQ for Statsz/Pings for processing not inline with routes etc.
func (s *Server) noInlineCallbackStatsz(cb sysMsgHandler) msgHandler {
return s.noInlineCallbackRecvQSelect(cb, recvQStatsz)
}
// Create a wrapped callback handler for the subscription that will move it to an
// internal IPQueue for processing not inline with routes etc.
func (s *Server) noInlineCallbackRecvQSelect(cb sysMsgHandler, recvQSelect int) msgHandler {
s.mu.RLock()
if !s.eventsEnabled() {
s.mu.RUnlock()
return nil
}
// Capture here for direct reference to avoid any unnecessary blocking inline with routes, gateways etc.
recvq := s.sys.recvq
var recvq *ipQueue[*inSysMsg]
switch recvQSelect {
case recvQStatsz:
recvq = s.sys.recvqp
default:
recvq = s.sys.recvq
}
s.mu.RUnlock()
return func(sub *subscription, c *client, acc *Account, subj, rply string, rmsg []byte) {

View File

@@ -27,10 +27,12 @@ import (
"fmt"
"hash"
"io"
"io/fs"
"math"
"net"
"os"
"path/filepath"
"slices"
"sort"
"strings"
"sync"
@@ -237,6 +239,7 @@ type msgBlock struct {
noTrack bool
needSync bool
syncAlways bool
noCompact bool
closed bool
// Used to mock write failures.
@@ -765,9 +768,7 @@ func (fs *fileStore) setupAEK() error {
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err = os.WriteFile(keyFile, encrypted, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms)
if err != nil {
return err
}
@@ -803,9 +804,7 @@ func (fs *fileStore) writeStreamMeta() error {
b = fs.aek.Seal(nonce, nonce, b, nil)
}
<-dios
err = os.WriteFile(meta, b, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(meta, b, defaultFilePerms)
if err != nil {
return err
}
@@ -813,9 +812,7 @@ func (fs *fileStore) writeStreamMeta() error {
fs.hh.Write(b)
checksum := hex.EncodeToString(fs.hh.Sum(nil))
sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
<-dios
err = os.WriteFile(sum, []byte(checksum), defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms)
if err != nil {
return err
}
@@ -1074,7 +1071,7 @@ func (fs *fileStore) addLostData(ld *LostStreamData) {
}
if added {
msgs := fs.ld.Msgs
sort.Slice(msgs, func(i, j int) bool { return msgs[i] < msgs[j] })
slices.Sort(msgs)
fs.ld.Bytes += ld.Bytes
}
} else {
@@ -1084,17 +1081,10 @@ func (fs *fileStore) addLostData(ld *LostStreamData) {
// Helper to see if we already have this sequence reported in our lost data.
func (ld *LostStreamData) exists(seq uint64) (int, bool) {
i, found := sort.Find(len(ld.Msgs), func(i int) int {
tseq := ld.Msgs[i]
if tseq < seq {
return -1
}
if tseq > seq {
return +1
}
return 0
i := slices.IndexFunc(ld.Msgs, func(i uint64) bool {
return i == seq
})
return i, found
return i, i > -1
}
func (fs *fileStore) removeFromLostData(seq uint64) {
@@ -1206,9 +1196,7 @@ func (mb *msgBlock) convertCipher() error {
// the old keyfile back.
if err := fs.genEncryptionKeysForBlock(mb); err != nil {
keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))
<-dios
os.WriteFile(keyFile, ekey, defaultFilePerms)
dios <- struct{}{}
fs.writeFileWithOptionalSync(keyFile, ekey, defaultFilePerms)
return err
}
mb.bek.XORKeyStream(buf, buf)
@@ -1249,6 +1237,13 @@ func (mb *msgBlock) convertToEncrypted() error {
return nil
}
// Return the mb's index.
func (mb *msgBlock) getIndex() uint32 {
mb.mu.RLock()
defer mb.mu.RUnlock()
return mb.index
}
// Rebuild the state of the blk based on what we have on disk in the N.blk file.
// We will return any lost data, and we will return any delete tombstones we encountered.
func (mb *msgBlock) rebuildState() (*LostStreamData, []uint64, error) {
@@ -2607,9 +2602,14 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
return ss
}
// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
// Will return -1 if no matches at all.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
// This is used to see if we can selectively jump start blocks based on filter subject and a starting block index.
// Will return -1 and ErrStoreEOF if no matches at all or no more from where we are.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool, bi int) (int, error) {
// If we match everything, just move to next blk.
if filter == _EMPTY_ || filter == fwcs {
return bi + 1, nil
}
// Move through psim to gather start and stop bounds.
start, stop := uint32(math.MaxUint32), uint32(0)
if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
@@ -2623,51 +2623,26 @@ func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
start, stop = psi.fblk, psi.lblk
}
// Nothing found.
// Nothing was found.
if start == uint32(math.MaxUint32) {
return -1, -1
return -1, ErrStoreEOF
}
// Here we need to translate this to index into fs.blks properly.
mb := fs.bim[start]
if mb == nil {
// psim fblk can be lazy.
i := start + 1
for ; i <= stop; i++ {
mb = fs.bim[i]
if mb == nil {
continue
}
if _, f, _ := mb.filteredPending(filter, wc, 0); f > 0 {
break
}
}
// Update fblk since fblk was outdated.
if !wc {
if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
psi.fblk = i
}
} else {
fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) {
if i > psi.fblk {
psi.fblk = i
}
})
// Can not be nil so ok to inline dereference.
mbi := fs.blks[bi].getIndex()
// All matching msgs are behind us.
// Less than AND equal is important because we were called because we missed searching bi.
if stop <= mbi {
return -1, ErrStoreEOF
}
// If start is > index return dereference of fs.blks index.
if start > mbi {
if mb := fs.bim[start]; mb != nil {
ni, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
return ni, nil
}
}
// Still nothing.
if mb == nil {
return -1, -1
}
// Grab first index.
fi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
// Grab last if applicable.
var li int
if mb = fs.bim[stop]; mb != nil {
li, _ = fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
}
return fi, li
// Otherwise just bump to the next one.
return bi + 1, nil
}
// Optimized way for getting all num pending matching a filter subject.
@@ -3283,9 +3258,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err = os.WriteFile(keyFile, encrypted, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms)
if err != nil {
return err
}
@@ -3987,6 +3960,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
mb.bytes = 0
}
// Allow us to check compaction again.
mb.noCompact = false
// Mark as dirty for stream state.
fs.dirty++
@@ -4029,7 +4005,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// All other more thorough cleanup will happen in syncBlocks logic.
// Note that we do not have to store empty records for the deleted, so don't use to calculate.
// TODO(dlc) - This should not be inline, should kick the sync routine.
if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock {
if !isLastBlock && mb.shouldCompactInline() {
mb.compact()
}
}
@@ -4091,6 +4067,21 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
return true, nil
}
// Tests whether we should try to compact this block while inline removing msgs.
// We will want rbytes to be over the minimum and have a 2x potential savings.
// Lock should be held.
func (mb *msgBlock) shouldCompactInline() bool {
return mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes
}
// Tests whether we should try to compact this block while running periodic sync.
// We will want rbytes to be over the minimum and have a 2x potential savings.
// Ignores 2MB minimum.
// Lock should be held.
func (mb *msgBlock) shouldCompactSync() bool {
return mb.bytes*2 < mb.rbytes && !mb.noCompact
}
// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
@@ -4197,7 +4188,12 @@ func (mb *msgBlock) compact() {
mb.needSync = true
// Capture the updated rbytes.
mb.rbytes = uint64(len(nbuf))
if rbytes := uint64(len(nbuf)); rbytes == mb.rbytes {
// No change, so set our noCompact bool here to avoid attempting to continually compress in syncBlocks.
mb.noCompact = true
} else {
mb.rbytes = rbytes
}
// Remove any seqs from the beginning of the blk.
for seq, nfseq := fseq, atomic.LoadUint64(&mb.first.seq); seq < nfseq; seq++ {
@@ -4984,6 +4980,9 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
}
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
} else {
// Make sure to account for tombstones in rbytes.
mb.rbytes += rl
}
fch, werr := mb.fch, mb.werr
@@ -5327,7 +5326,7 @@ func (fs *fileStore) syncBlocks() {
// Check if we should compact here as well.
// Do not compact last mb.
var needsCompact bool
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes {
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.shouldCompactSync() {
needsCompact = true
markDirty = true
}
@@ -6424,10 +6423,13 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Nothing found in this block. We missed, if first block (bi) check psim.
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
nbi, lbi := fs.checkSkipFirstBlock(filter, wc)
// We should not do this at all if we are already on the last block.
// Also if we are a wildcard do not check if large subject space.
const wcMaxSizeToCheck = 64 * 1024
if i == bi && i < len(fs.blks)-1 && (!wc || fs.psim.Size() < wcMaxSizeToCheck) {
nbi, err := fs.checkSkipFirstBlock(filter, wc, bi)
// Nothing available.
if nbi < 0 || lbi <= bi {
if err == ErrStoreEOF {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
@@ -6529,9 +6531,7 @@ func (fs *fileStore) State() StreamState {
// Can not be guaranteed to be sorted.
if len(state.Deleted) > 0 {
sort.Slice(state.Deleted, func(i, j int) bool {
return state.Deleted[i] < state.Deleted[j]
})
slices.Sort(state.Deleted)
state.NumDeleted = len(state.Deleted)
}
return state
@@ -8556,9 +8556,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
if err != nil {
return nil, err
}
<-dios
err = os.WriteFile(o.ifn, state, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(o.ifn, state, defaultFilePerms)
if err != nil {
if didCreate {
os.RemoveAll(odir)
@@ -9032,9 +9030,7 @@ func (o *consumerFileStore) writeState(buf []byte) error {
o.mu.Unlock()
// Lock not held here but we do limit number of outstanding calls that could block OS threads.
<-dios
err := os.WriteFile(ifn, buf, defaultFilePerms)
dios <- struct{}{}
err := o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms)
o.mu.Lock()
if err != nil {
@@ -9073,9 +9069,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err = os.WriteFile(keyFile, encrypted, defaultFilePerms)
dios <- struct{}{}
err = cfs.fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms)
if err != nil {
return err
}
@@ -9096,9 +9090,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
b = cfs.aek.Seal(nonce, nonce, b, nil)
}
<-dios
err = os.WriteFile(meta, b, defaultFilePerms)
dios <- struct{}{}
err = cfs.fs.writeFileWithOptionalSync(meta, b, defaultFilePerms)
if err != nil {
return err
}
@@ -9107,9 +9099,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
checksum := hex.EncodeToString(cfs.hh.Sum(nil))
sum := filepath.Join(cfs.odir, JetStreamMetaFileSum)
<-dios
err = os.WriteFile(sum, []byte(checksum), defaultFilePerms)
dios <- struct{}{}
err = cfs.fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms)
if err != nil {
return err
}
@@ -9404,9 +9394,7 @@ func (o *consumerFileStore) Stop() error {
if len(buf) > 0 {
o.waitOnFlusher()
<-dios
err = os.WriteFile(ifn, buf, defaultFilePerms)
dios <- struct{}{}
err = o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms)
}
return err
}
@@ -9640,3 +9628,26 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {
return output, reader.Close()
}
// writeFileWithOptionalSync is equivalent to os.WriteFile() but optionally
// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is
// handled automatically by this function, so don't wrap calls to it in dios.
func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error {
<-dios
defer func() {
dios <- struct{}{}
}()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if fs.fcfg.SyncAlways {
flags |= os.O_SYNC
}
f, err := os.OpenFile(name, flags, perm)
if err != nil {
return err
}
if _, err = f.Write(data); err != nil {
_ = f.Close()
return err
}
return f.Close()
}

View File

@@ -15,6 +15,7 @@ package server
import (
"bytes"
"cmp"
"crypto/sha256"
"crypto/tls"
"encoding/json"
@@ -22,7 +23,7 @@ import (
"math/rand"
"net"
"net/url"
"sort"
"slices"
"strconv"
"sync"
"sync/atomic"
@@ -1730,9 +1731,7 @@ func (s *Server) getOutboundGatewayConnections(a *[]*client) {
// Gateway write lock is held on entry
func (g *srvGateway) orderOutboundConnectionsLocked() {
// Order the gateways by lowest RTT
sort.Slice(g.outo, func(i, j int) bool {
return g.outo[i].getRTTValue() < g.outo[j].getRTTValue()
})
slices.SortFunc(g.outo, func(i, j *client) int { return cmp.Compare(i.getRTTValue(), j.getRTTValue()) })
}
// Orders the array of outbound connections.
@@ -2131,7 +2130,7 @@ func (c *client) processGatewayRSub(arg []byte) error {
// for queue subscriptions.
// <Outbound connection: invoked when client message is published,
// so from any client connection's readLoop>
func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) {
func (c *client) gatewayInterest(acc string, subj []byte) (bool, *SublistResult) {
ei, accountInMap := c.gw.outsim.Load(acc)
// If there is an entry for this account and ei is nil,
// it means that the remote is not interested at all in
@@ -2152,14 +2151,14 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) {
// but until e.ni is nil, use it to know if we
// should suppress interest or not.
if !c.gw.interestOnlyMode && e.ni != nil {
if _, inMap := e.ni[subj]; !inMap {
if _, inMap := e.ni[string(subj)]; !inMap {
psi = true
}
}
// If we are in modeInterestOnly (e.ni will be nil)
// or if we have queue subs, we also need to check sl.Match.
if e.ni == nil || e.qsubs > 0 {
r = e.sl.Match(subj)
r = e.sl.MatchBytes(subj)
if len(r.psubs) > 0 {
psi = true
}
@@ -2482,7 +2481,7 @@ func (g *srvGateway) shouldMapReplyForGatewaySend(acc *Account, reply []byte) bo
}
sl := sli.(*Sublist)
if sl.Count() > 0 {
if r := sl.Match(string(reply)); len(r.psubs)+len(r.qsubs) > 0 {
if sl.HasInterest(string(reply)) {
return true
}
}
@@ -2568,7 +2567,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
}
} else {
// Plain sub interest and queue sub results for this account/subject
psi, qr := gwc.gatewayInterest(accName, string(subject))
psi, qr := gwc.gatewayInterest(accName, subject)
if !psi && qr == nil {
continue
}

View File

@@ -15,6 +15,7 @@ package server
import (
"bytes"
"cmp"
"encoding/json"
"errors"
"fmt"
@@ -22,7 +23,7 @@ import (
"os"
"path/filepath"
"runtime"
"sort"
"slices"
"strconv"
"strings"
"sync/atomic"
@@ -824,10 +825,10 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
const warnThresh = 32
const warnThresh = 128
pending := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if pending > warnThresh {
s.RateLimitWarnf("JetStream request queue has high pending count: %d", pending)
if pending >= warnThresh {
s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending)
}
}
@@ -1203,8 +1204,8 @@ func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, _ *Account
}
ts := acc.templates()
sort.Slice(ts, func(i, j int) bool {
return strings.Compare(ts[i].StreamTemplateConfig.Name, ts[j].StreamTemplateConfig.Name) < 0
slices.SortFunc(ts, func(i, j *streamTemplate) int {
return cmp.Compare(i.StreamTemplateConfig.Name, j.StreamTemplateConfig.Name)
})
tcnt := len(ts)
@@ -1624,7 +1625,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
}
js.mu.RUnlock()
if len(resp.Streams) > 1 {
sort.Slice(resp.Streams, func(i, j int) bool { return strings.Compare(resp.Streams[i], resp.Streams[j]) < 0 })
slices.Sort(resp.Streams)
}
numStreams = len(resp.Streams)
if offset > numStreams {
@@ -1640,9 +1641,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
msets := acc.filteredStreams(filter)
// Since we page results order matters.
if len(msets) > 1 {
sort.Slice(msets, func(i, j int) bool {
return strings.Compare(msets[i].cfg.Name, msets[j].cfg.Name) < 0
})
slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
}
numStreams = len(msets)
@@ -1739,9 +1738,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
msets = acc.filteredStreams(filter)
}
sort.Slice(msets, func(i, j int) bool {
return strings.Compare(msets[i].cfg.Name, msets[j].cfg.Name) < 0
})
slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
scnt := len(msets)
if offset > scnt {
@@ -1944,7 +1941,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
subjs = append(subjs, subj)
}
// Sort it
sort.Strings(subjs)
slices.Sort(subjs)
if offset > len(subjs) {
offset = len(subjs)
@@ -4074,7 +4071,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
resp.Consumers = append(resp.Consumers, consumer)
}
if len(resp.Consumers) > 1 {
sort.Slice(resp.Consumers, func(i, j int) bool { return strings.Compare(resp.Consumers[i], resp.Consumers[j]) < 0 })
slices.Sort(resp.Consumers)
}
numConsumers = len(resp.Consumers)
if offset > numConsumers {
@@ -4095,9 +4092,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
}
obs := mset.getPublicConsumers()
sort.Slice(obs, func(i, j int) bool {
return strings.Compare(obs[i].name, obs[j].name) < 0
})
slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
numConsumers = len(obs)
if offset > numConsumers {
@@ -4190,9 +4185,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
}
obs := mset.getPublicConsumers()
sort.Slice(obs, func(i, j int) bool {
return strings.Compare(obs[i].name, obs[j].name) < 0
})
slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
ocnt := len(obs)
if offset > ocnt {

View File

@@ -15,6 +15,7 @@ package server
import (
"bytes"
"cmp"
crand "crypto/rand"
"encoding/binary"
"encoding/json"
@@ -25,7 +26,7 @@ import (
"os"
"path/filepath"
"reflect"
"sort"
"slices"
"strconv"
"strings"
"sync/atomic"
@@ -1141,17 +1142,24 @@ type recoveryUpdates struct {
// Streams and consumers are recovered from disk, and the meta layer's mappings
// should clean them up, but under crash scenarios there could be orphans.
func (js *jetStream) checkForOrphans() {
consumerName := func(o *consumer) string {
o.mu.RLock()
defer o.mu.RUnlock()
return o.name
}
// Can not hold jetstream lock while trying to delete streams or consumers.
js.mu.Lock()
s, cc := js.srv, js.cluster
s.Debugf("JetStream cluster checking for orphans")
// We only want to cleanup any orphans if we know we are current with the meta-leader.
meta := cc.meta
if meta == nil || meta.GroupLeader() == _EMPTY_ {
js.mu.Unlock()
s.Debugf("JetStream cluster skipping check for orphans, no meta-leader")
return
}
if !meta.Healthy() {
js.mu.Unlock()
s.Debugf("JetStream cluster skipping check for orphans, not current with the meta-leader")
return
}
var streams []*stream
var consumers []*consumer
@@ -1164,8 +1172,7 @@ func (js *jetStream) checkForOrphans() {
} else {
// This one is good, check consumers now.
for _, o := range mset.getConsumers() {
consumer := consumerName(o)
if sa.consumers[consumer] == nil {
if sa.consumers[o.String()] == nil {
consumers = append(consumers, o)
}
}
@@ -1333,6 +1340,11 @@ func (js *jetStream) monitorCluster() {
updateConsumers: make(map[string]*consumerAssignment),
}
// Make sure to cancel any pending checkForOrphans calls if the
// monitor goroutine exits.
var oc *time.Timer
defer stopAndClearTimer(&oc)
for {
select {
case <-s.quitCh:
@@ -1369,7 +1381,7 @@ func (js *jetStream) monitorCluster() {
// Clear.
ru = nil
s.Debugf("Recovered JetStream cluster metadata")
js.checkForOrphans()
oc = time.AfterFunc(30*time.Second, js.checkForOrphans)
// Do a health check here as well.
go checkHealth()
continue
@@ -2249,7 +2261,12 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
// This should be below the checkInMonitor call though to avoid stopping it out
// from underneath the one that is running since it will be the same raft node.
defer n.Stop()
defer func() {
// We might be closing during shutdown, don't pre-emptively stop here since we'll still want to install snapshots.
if !mset.closed.Load() {
n.Stop()
}
}()
qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID()
@@ -2408,6 +2425,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
return
case <-aq.ch:
var ne, nb uint64
// If we bump clfs we will want to write out snapshot if within our time window.
pclfs := mset.getCLFS()
ces := aq.pop()
for _, ce := range ces {
// No special processing needed for when we are caught up on restart.
@@ -2424,6 +2444,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
continue
}
// Apply our entries.
if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
// Update our applied.
@@ -2455,7 +2476,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Check about snapshotting
// If we have at least min entries to compact, go ahead and try to snapshot/compact.
if ne >= compactNumMin || nb > compactSizeMin {
if ne >= compactNumMin || nb > compactSizeMin || mset.getCLFS() > pclfs {
// We want to make sure we do not short circuit if transistioning from no clfs.
if pclfs == 0 {
// This is always false by default.
lastState.firstNeedsUpdate = true
lastSnapTime = time.Time{}
}
doSnapshot()
}
@@ -3114,13 +3141,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
}
if !isRecovering && !mset.IsLeader() {
if isRecovering || !mset.IsLeader() {
if err := mset.processSnapshot(ss); err != nil {
return err
}
} else if isRecovering {
// On recovery, reset CLFS/FAILED.
mset.setCLFS(ss.Failed)
}
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
@@ -5894,15 +5918,15 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
return nil, &err
}
// Sort based on available from most to least, breaking ties by number of total streams assigned to the peer.
sort.Slice(nodes, func(i, j int) bool {
if nodes[i].avail == nodes[j].avail {
return nodes[i].ns < nodes[j].ns
slices.SortFunc(nodes, func(i, j wn) int {
if i.avail == j.avail {
return cmp.Compare(i.ns, j.ns)
}
return nodes[i].avail > nodes[j].avail
return -cmp.Compare(i.avail, j.avail) // reverse
})
// If we are placing a replicated stream, let's sort based on HAAssets, as that is more important to balance.
if cfg.Replicas > 1 {
sort.SliceStable(nodes, func(i, j int) bool { return nodes[i].ha < nodes[j].ha })
slices.SortStableFunc(nodes, func(i, j wn) int { return cmp.Compare(i.ha, j.ha) })
}
var results []string
@@ -6662,9 +6686,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
// Needs to be sorted for offsets etc.
if len(streams) > 1 {
sort.Slice(streams, func(i, j int) bool {
return strings.Compare(streams[i].Config.Name, streams[j].Config.Name) < 0
})
slices.SortFunc(streams, func(i, j *streamAssignment) int { return cmp.Compare(i.Config.Name, j.Config.Name) })
}
scnt := len(streams)
@@ -6777,9 +6799,7 @@ LOOP:
// Needs to be sorted as well.
if len(resp.Streams) > 1 {
sort.Slice(resp.Streams, func(i, j int) bool {
return strings.Compare(resp.Streams[i].Config.Name, resp.Streams[j].Config.Name) < 0
})
slices.SortFunc(resp.Streams, func(i, j *StreamInfo) int { return cmp.Compare(i.Config.Name, j.Config.Name) })
}
resp.Total = scnt
@@ -6812,9 +6832,7 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
}
// Needs to be sorted.
if len(consumers) > 1 {
sort.Slice(consumers, func(i, j int) bool {
return strings.Compare(consumers[i].Name, consumers[j].Name) < 0
})
slices.SortFunc(consumers, func(i, j *consumerAssignment) int { return cmp.Compare(i.Config.Name, j.Config.Name) })
}
ocnt := len(consumers)
@@ -6924,9 +6942,7 @@ LOOP:
// Needs to be sorted as well.
if len(resp.Consumers) > 1 {
sort.Slice(resp.Consumers, func(i, j int) bool {
return strings.Compare(resp.Consumers[i].Name, resp.Consumers[j].Name) < 0
})
slices.SortFunc(resp.Consumers, func(i, j *ConsumerInfo) int { return cmp.Compare(i.Name, j.Name) })
}
resp.Total = ocnt
@@ -7799,13 +7815,16 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
// Expected last sequence per subject.
// We can check for last sequence per subject but only if the expected seq <= lseq.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 && seq <= lseq {
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq <= lseq {
var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(subject, &smv)
if sm != nil {
fseq = sm.seq
}
if err == ErrStoreMsgNotFound && seq == 0 {
fseq, err = 0, nil
}
if err != nil || fseq != seq {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
@@ -8537,9 +8556,7 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
}
// Order the result based on the name so that we get something consistent
// when doing repeated stream info in the CLI, etc...
sort.Slice(ci.Replicas, func(i, j int) bool {
return ci.Replicas[i].Name < ci.Replicas[j].Name
})
slices.SortFunc(ci.Replicas, func(i, j *PeerInfo) int { return cmp.Compare(i.Name, j.Name) })
return ci
}
@@ -8599,9 +8616,8 @@ func (js *jetStream) streamAlternates(ci *ClientInfo, stream string) []StreamAlt
}
// Sort based on our weights that originate from the request itself.
sort.Slice(alts, func(i, j int) bool {
return weights[alts[i].Cluster] > weights[alts[j].Cluster]
})
// reverse sort
slices.SortFunc(alts, func(i, j StreamAlternate) int { return -cmp.Compare(weights[i.Cluster], weights[j.Cluster]) })
return alts
}

View File

@@ -34,6 +34,7 @@ import (
"sync"
"sync/atomic"
"time"
"unicode"
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
@@ -1764,6 +1765,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
return err
}
// Reject a cluster that contains spaces or line breaks.
if proto.Cluster != _EMPTY_ && strings.ContainsFunc(proto.Cluster, unicode.IsSpace) {
c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
c.closeConnection(ProtocolViolation)
return ErrClusterNameHasSpaces
}
// Check for cluster name collisions.
if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
@@ -2669,9 +2677,8 @@ func (c *client) processInboundLeafMsg(msg []byte) {
// Go back to the sublist data structure.
if !ok {
r = c.acc.sl.Match(subject)
c.in.results[subject] = r
// Prune the results cache. Keeps us from unbounded growth. Random delete.
if len(c.in.results) > maxResultCacheSize {
if len(c.in.results) >= maxResultCacheSize {
n := 0
for subj := range c.in.results {
delete(c.in.results, subj)
@@ -2680,6 +2687,8 @@ func (c *client) processInboundLeafMsg(msg []byte) {
}
}
}
// Then add the new cache entry.
c.in.results[subject] = r
}
// Collect queue names if needed.

View File

@@ -219,6 +219,14 @@ func (s *Server) Warnf(format string, v ...any) {
}, format, v...)
}
func (s *Server) rateLimitFormatWarnf(format string, v ...any) {
if _, loaded := s.rateLimitLogging.LoadOrStore(format, time.Now()); loaded {
return
}
statement := fmt.Sprintf(format, v...)
s.Warnf("%s", statement)
}
func (s *Server) RateLimitWarnf(format string, v ...any) {
statement := fmt.Sprintf(format, v...)
if _, loaded := s.rateLimitLogging.LoadOrStore(statement, time.Now()); loaded {

View File

@@ -15,6 +15,7 @@ package server
import (
"bytes"
"cmp"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
@@ -29,6 +30,7 @@ import (
"path/filepath"
"runtime"
"runtime/pprof"
"slices"
"sort"
"strconv"
"strings"
@@ -2724,15 +2726,16 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
// JSzOptions are options passed to Jsz
type JSzOptions struct {
Account string `json:"account,omitempty"`
Accounts bool `json:"accounts,omitempty"`
Streams bool `json:"streams,omitempty"`
Consumer bool `json:"consumer,omitempty"`
Config bool `json:"config,omitempty"`
LeaderOnly bool `json:"leader_only,omitempty"`
Offset int `json:"offset,omitempty"`
Limit int `json:"limit,omitempty"`
RaftGroups bool `json:"raft,omitempty"`
Account string `json:"account,omitempty"`
Accounts bool `json:"accounts,omitempty"`
Streams bool `json:"streams,omitempty"`
Consumer bool `json:"consumer,omitempty"`
Config bool `json:"config,omitempty"`
LeaderOnly bool `json:"leader_only,omitempty"`
Offset int `json:"offset,omitempty"`
Limit int `json:"limit,omitempty"`
RaftGroups bool `json:"raft,omitempty"`
StreamLeaderOnly bool `json:"stream_leader_only,omitempty"`
}
// HealthzOptions are options passed to Healthz
@@ -2749,8 +2752,9 @@ type HealthzOptions struct {
// ProfilezOptions are options passed to Profilez
type ProfilezOptions struct {
Name string `json:"name"`
Debug int `json:"debug"`
Name string `json:"name"`
Debug int `json:"debug"`
Duration time.Duration `json:"duration,omitempty"`
}
// StreamDetail shows information about the stream state and its consumers.
@@ -2806,7 +2810,7 @@ type JSInfo struct {
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
}
func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft bool) *AccountDetail {
func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail {
jsa.mu.RLock()
acc := jsa.account
name := acc.GetName()
@@ -2852,6 +2856,10 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg,
c := stream.config()
cfg = &c
}
// Skip if we are only looking for stream leaders.
if optStreamLeader && ci != nil && ci.Leader != s.Name() {
continue
}
sdet := StreamDetail{
Name: stream.name(),
Created: stream.createdTime(),
@@ -2907,7 +2915,7 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) {
if !ok {
return nil, fmt.Errorf("account %q not jetstream enabled", acc)
}
return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups), nil
return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly), nil
}
// helper to get cluster info from node via dummy group
@@ -3012,9 +3020,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
accounts = []*jsAccount{accounts[filterIdx]}
} else if opts.Accounts {
if opts.Offset != 0 {
sort.Slice(accounts, func(i, j int) bool {
return strings.Compare(accounts[i].acc().Name, accounts[j].acc().Name) < 0
})
slices.SortFunc(accounts, func(i, j *jsAccount) int { return cmp.Compare(i.acc().Name, j.acc().Name) })
if opts.Offset > len(accounts) {
accounts = []*jsAccount{}
} else {
@@ -3034,7 +3040,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
}
// if wanted, obtain accounts/streams/consumer
for _, jsa := range accounts {
detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups)
detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly)
jsi.AccountDetails = append(jsi.AccountDetails, detail)
}
return jsi, nil
@@ -3078,16 +3084,22 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) {
return
}
sleader, err := decodeBool(w, r, "stream-leader-only")
if err != nil {
return
}
l, err := s.Jsz(&JSzOptions{
r.URL.Query().Get("acc"),
accounts,
streams,
consumers,
config,
leader,
offset,
limit,
rgroups,
Account: r.URL.Query().Get("acc"),
Accounts: accounts,
Streams: streams,
Consumer: consumers,
Config: config,
LeaderOnly: leader,
Offset: offset,
Limit: limit,
RaftGroups: rgroups,
StreamLeaderOnly: sleader,
})
if err != nil {
w.WriteHeader(http.StatusBadRequest)
@@ -3724,21 +3736,36 @@ type ProfilezStatus struct {
}
func (s *Server) profilez(opts *ProfilezOptions) *ProfilezStatus {
if opts.Name == _EMPTY_ {
var buffer bytes.Buffer
switch opts.Name {
case _EMPTY_:
return &ProfilezStatus{
Error: "Profile name not specified",
}
}
profile := pprof.Lookup(opts.Name)
if profile == nil {
return &ProfilezStatus{
Error: fmt.Sprintf("Profile %q not found", opts.Name),
case "cpu":
if opts.Duration <= 0 || opts.Duration > 15*time.Second {
return &ProfilezStatus{
Error: fmt.Sprintf("Duration %s should be between 0s and 15s", opts.Duration),
}
}
}
var buffer bytes.Buffer
if err := profile.WriteTo(&buffer, opts.Debug); err != nil {
return &ProfilezStatus{
Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err),
if err := pprof.StartCPUProfile(&buffer); err != nil {
return &ProfilezStatus{
Error: fmt.Sprintf("Failed to start CPU profile: %s", err),
}
}
time.Sleep(opts.Duration)
pprof.StopCPUProfile()
default:
profile := pprof.Lookup(opts.Name)
if profile == nil {
return &ProfilezStatus{
Error: fmt.Sprintf("Profile %q not found", opts.Name),
}
}
if err := profile.WriteTo(&buffer, opts.Debug); err != nil {
return &ProfilezStatus{
Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err),
}
}
}
return &ProfilezStatus{
@@ -3787,8 +3814,14 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) {
gfilter := r.URL.Query().Get("group")
afilter := r.URL.Query().Get("acc")
if afilter == "" {
afilter = s.SystemAccount().Name
if afilter == _EMPTY_ {
if sys := s.SystemAccount(); sys != nil {
afilter = sys.Name
} else {
w.WriteHeader(404)
w.Write([]byte("System account not found, the server may be shutting down"))
return
}
}
groups := map[string]RaftNode{}

View File

@@ -15,6 +15,7 @@ package server
import (
"bytes"
"cmp"
"crypto/tls"
"encoding/binary"
"encoding/json"
@@ -23,7 +24,7 @@ import (
"io"
"net"
"net/http"
"sort"
"slices"
"strconv"
"strings"
"sync"
@@ -4276,9 +4277,7 @@ func (s *Server) mqttCheckPubRetainedPerms() {
})
}
asm.mu.RUnlock()
sort.Slice(rms, func(i, j int) bool {
return rms[i].rmsg.sseq < rms[j].rmsg.sseq
})
slices.SortFunc(rms, func(i, j retainedMsg) int { return cmp.Compare(i.rmsg.sseq, j.rmsg.sseq) })
perms := map[string]*perm{}
deletes := map[string]uint64{}

View File

@@ -25,10 +25,12 @@ import (
"syscall"
"time"
"unsafe"
"golang.org/x/sys/windows"
)
var (
pdh = syscall.NewLazyDLL("pdh.dll")
pdh = windows.NewLazySystemDLL("pdh.dll")
winPdhOpenQuery = pdh.NewProc("PdhOpenQuery")
winPdhAddCounter = pdh.NewProc("PdhAddCounterW")
winPdhCollectQueryData = pdh.NewProc("PdhCollectQueryData")
@@ -36,6 +38,20 @@ var (
winPdhGetFormattedCounterArray = pdh.NewProc("PdhGetFormattedCounterArrayW")
)
func init() {
if err := pdh.Load(); err != nil {
panic(err)
}
for _, p := range []*windows.LazyProc{
winPdhOpenQuery, winPdhAddCounter, winPdhCollectQueryData,
winPdhGetFormattedCounterValue, winPdhGetFormattedCounterArray,
} {
if err := p.Find(); err != nil {
panic(err)
}
}
}
// global performance counter query handle and counters
var (
pcHandle PDH_HQUERY

View File

@@ -499,7 +499,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
// If we fail to do this for some reason then this is fatal — we cannot
// continue setting up or the Raft node may be partially/totally isolated.
if err := n.createInternalSubs(); err != nil {
n.shutdown(true)
n.shutdown(false)
return nil, err
}

View File

@@ -14,12 +14,13 @@
package server
import (
"cmp"
"crypto/tls"
"errors"
"fmt"
"net/url"
"reflect"
"sort"
"slices"
"strings"
"sync/atomic"
"time"
@@ -1131,38 +1132,24 @@ func (s *Server) reloadOptions(curOpts, newOpts *Options) error {
func imposeOrder(value any) error {
switch value := value.(type) {
case []*Account:
sort.Slice(value, func(i, j int) bool {
return value[i].Name < value[j].Name
})
slices.SortFunc(value, func(i, j *Account) int { return cmp.Compare(i.Name, j.Name) })
for _, a := range value {
sort.Slice(a.imports.streams, func(i, j int) bool {
return a.imports.streams[i].acc.Name < a.imports.streams[j].acc.Name
})
slices.SortFunc(a.imports.streams, func(i, j *streamImport) int { return cmp.Compare(i.acc.Name, j.acc.Name) })
}
case []*User:
sort.Slice(value, func(i, j int) bool {
return value[i].Username < value[j].Username
})
slices.SortFunc(value, func(i, j *User) int { return cmp.Compare(i.Username, j.Username) })
case []*NkeyUser:
sort.Slice(value, func(i, j int) bool {
return value[i].Nkey < value[j].Nkey
})
slices.SortFunc(value, func(i, j *NkeyUser) int { return cmp.Compare(i.Nkey, j.Nkey) })
case []*url.URL:
sort.Slice(value, func(i, j int) bool {
return value[i].String() < value[j].String()
})
slices.SortFunc(value, func(i, j *url.URL) int { return cmp.Compare(i.String(), j.String()) })
case []string:
sort.Strings(value)
slices.Sort(value)
case []*jwt.OperatorClaims:
sort.Slice(value, func(i, j int) bool {
return value[i].Issuer < value[j].Issuer
})
slices.SortFunc(value, func(i, j *jwt.OperatorClaims) int { return cmp.Compare(i.Issuer, j.Issuer) })
case GatewayOpts:
sort.Slice(value.Gateways, func(i, j int) bool {
return value.Gateways[i].Name < value.Gateways[j].Name
})
slices.SortFunc(value.Gateways, func(i, j *RemoteGatewayOpts) int { return cmp.Compare(i.Name, j.Name) })
case WebsocketOpts:
sort.Strings(value.AllowedOrigins)
slices.Sort(value.AllowedOrigins)
case string, bool, uint8, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet,
*URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList,
*OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig:

View File

@@ -27,8 +27,10 @@ import (
"math/rand"
"net"
"net/http"
"net/url"
"regexp"
"runtime/pprof"
"unicode"
// Allow dynamic profiling.
_ "net/http/pprof"
@@ -953,11 +955,13 @@ func (s *Server) serverName() string {
func (s *Server) ClientURL() string {
// FIXME(dlc) - should we add in user and pass if defined single?
opts := s.getOpts()
scheme := "nats://"
var u url.URL
u.Scheme = "nats"
if opts.TLSConfig != nil {
scheme = "tls://"
u.Scheme = "tls"
}
return fmt.Sprintf("%s%s:%d", scheme, opts.Host, opts.Port)
u.Host = net.JoinHostPort(opts.Host, fmt.Sprintf("%d", opts.Port))
return u.String()
}
func validateCluster(o *Options) error {
@@ -1698,12 +1702,14 @@ func (s *Server) setSystemAccount(acc *Account) error {
replies: make(map[string]msgHandler),
sendq: newIPQueue[*pubMsg](s, "System sendQ"),
recvq: newIPQueue[*inSysMsg](s, "System recvQ"),
recvqp: newIPQueue[*inSysMsg](s, "System recvQ Pings"),
resetCh: make(chan struct{}),
sq: s.newSendQ(),
statsz: eventsHBInterval,
orphMax: 5 * eventsHBInterval,
chkOrph: 3 * eventsHBInterval,
}
recvq, recvqp := s.sys.recvq, s.sys.recvqp
s.sys.wg.Add(1)
s.mu.Unlock()
@@ -1717,7 +1723,9 @@ func (s *Server) setSystemAccount(acc *Account) error {
go s.internalSendLoop(&s.sys.wg)
// Start the internal loop for inbound messages.
go s.internalReceiveLoop()
go s.internalReceiveLoop(recvq)
// Start the internal loop for inbound STATSZ/Ping messages.
go s.internalReceiveLoop(recvqp)
// Start up our general subscriptions
s.initEventTracking()
@@ -2348,6 +2356,9 @@ func (s *Server) Start() {
// Solicit remote servers for leaf node connections.
if len(opts.LeafNode.Remotes) > 0 {
s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
if opts.Cluster.Name == opts.ServerName && strings.ContainsFunc(opts.Cluster.Name, unicode.IsSpace) {
s.Warnf("Server name has spaces and used as the cluster name, leaf remotes may not connect properly")
}
}
// TODO (ik): I wanted to refactor this by starting the client
@@ -2772,6 +2783,7 @@ func (s *Server) StartProfiler() {
Addr: hp,
Handler: http.DefaultServeMux,
MaxHeaderBytes: 1 << 20,
ReadTimeout: time.Second * 5,
}
s.profiler = l
s.profilingServer = srv
@@ -2971,10 +2983,11 @@ func (s *Server) startMonitoring(secure bool) error {
// to return empty response or unable to display page if the
// server needs more time to build the response.
srv := &http.Server{
Addr: hp,
Handler: mux,
MaxHeaderBytes: 1 << 20,
ErrorLog: log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0),
Addr: hp,
Handler: mux,
MaxHeaderBytes: 1 << 20,
ErrorLog: log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0),
ReadHeaderTimeout: time.Second * 5,
}
s.mu.Lock()
s.http = httpListener
@@ -3294,7 +3307,7 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client {
// This will save off a closed client in a ring buffer such that
// /connz can inspect. Useful for debugging, etc.
func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) {
func (s *Server) saveClosedClient(c *client, nc net.Conn, subs map[string]*subscription, reason ClosedState) {
now := time.Now()
s.accountDisconnectEvent(c, now, reason.String())
@@ -3303,17 +3316,18 @@ func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) {
cc := &closedClient{}
cc.fill(c, nc, now, false)
// Note that cc.fill is using len(c.subs), which may have been set to nil by now,
// so replace cc.NumSubs with len(subs).
cc.NumSubs = uint32(len(subs))
cc.Stop = &now
cc.Reason = reason.String()
// Do subs, do not place by default in main ConnInfo
if len(c.subs) > 0 {
cc.subs = make([]SubDetail, 0, len(c.subs))
for _, sub := range c.subs {
if len(subs) > 0 {
cc.subs = make([]SubDetail, 0, len(subs))
for _, sub := range subs {
cc.subs = append(cc.subs, newSubDetail(sub))
}
// Now set this to nil to allow connection to be released.
c.subs = nil
}
// Hold user as well.
cc.user = c.getRawAuthUser()

View File

@@ -765,3 +765,11 @@ func stringToBytes(s string) []byte {
b := unsafe.Slice(p, len(s))
return b
}
// Forces a copy of a string, for use in the case that you might have been passed a value when bytesToString was used,
// but now you need a separate copy of it to store for longer-term use.
func copyString(s string) string {
b := make([]byte, len(s))
copy(b, s)
return bytesToString(b)
}

View File

@@ -236,6 +236,7 @@ type stream struct {
consumers map[string]*consumer // The consumers for this stream.
numFilter int // The number of filtered consumers.
cfg StreamConfig // The stream's config.
cfgMu sync.RWMutex // Config mutex used to solve some races with consumer code
created time.Time // Time the stream was created.
stype StorageType // The storage type.
tier string // The tier is the number of replicas for the stream (e.g. "R1" or "R3").
@@ -1018,6 +1019,9 @@ func (mset *stream) lastSeqAndCLFS() (uint64, uint64) {
}
func (mset *stream) getCLFS() uint64 {
if mset == nil {
return 0
}
mset.clMu.Lock()
defer mset.clMu.Unlock()
return mset.clfs
@@ -1950,7 +1954,14 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
}
// Now update config and store's version of our config.
// Although we are under the stream write lock, we will also assign the new
// configuration under mset.cfgMu lock. This is so that in places where
// mset.mu cannot be acquired (like many cases in consumer.go where code
// is under the consumer's lock), and the stream's configuration needs to
// be inspected, one can use mset.cfgMu's read lock to do that safely.
mset.cfgMu.Lock()
mset.cfg = *cfg
mset.cfgMu.Unlock()
// If we're changing retention and haven't errored because of consumer
// replicas by now, whip through and update the consumer retention.
@@ -2001,6 +2012,15 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
return nil
}
// Small helper to return the Name field from mset.cfg, protected by
// the mset.cfgMu mutex. This is simply because we have several places
// in consumer.go where we need it.
func (mset *stream) getCfgName() string {
mset.cfgMu.RLock()
defer mset.cfgMu.RUnlock()
return mset.cfg.Name
}
// Purge will remove all messages from the stream and underlying store based on the request.
func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) {
mset.mu.RLock()
@@ -4369,7 +4389,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
// Expected last sequence per subject.
// If we are clustered we have prechecked seq > 0.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists {
// TODO(dlc) - We could make a new store func that does this all in one.
var smv StoreMsg
@@ -4378,8 +4397,17 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if sm != nil {
fseq = sm.seq
}
if err == ErrStoreMsgNotFound && seq == 0 {
fseq, err = 0, nil
if err == ErrStoreMsgNotFound {
if seq == 0 {
fseq, err = 0, nil
} else if mset.isClustered() {
// Do not bump clfs in case message was not found and could have been deleted.
var ss StreamState
store.FastState(&ss)
if seq <= ss.LastSeq {
fseq, err = seq, nil
}
}
}
if err != nil || fseq != seq {
mset.mu.Unlock()
@@ -4521,7 +4549,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
mset.lmsgId = msgId
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
mset.storeMsgIdLocked(&ddentry{msgId, mset.lseq, ts})
}
if canRespond {
response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...)
@@ -4573,6 +4601,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
response, _ = json.Marshal(resp)
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0))
}
mset.mu.Unlock()
return err
}
}
@@ -5069,6 +5099,10 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
// Kick monitor and collect consumers first.
mset.mu.Lock()
// Mark closed.
mset.closed.Store(true)
// Signal to the monitor loop.
// Can't use qch here.
if mset.mqch != nil {
@@ -5129,9 +5163,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.sendDeleteAdvisoryLocked()
}
// Mark closed.
mset.closed.Store(true)
// Quit channel, do this after sending the delete advisory
if mset.qch != nil {
close(mset.qch)
@@ -5272,92 +5303,9 @@ func (mset *stream) checkInterestState() {
return
}
var zeroAcks []*consumer
var lowAckFloor uint64 = math.MaxUint64
for _, o := range mset.getConsumers() {
o.checkStateForInterestStream()
o.mu.Lock()
if o.isLeader() {
// We need to account for consumers with ack floor of zero.
// We will collect them and see if we need to check pending below.
if o.asflr == 0 {
zeroAcks = append(zeroAcks, o)
} else if o.asflr < lowAckFloor {
lowAckFloor = o.asflr
}
} else {
// We are a follower so only have the store state, so read that in.
state, err := o.store.State()
if err != nil {
// On error we will not have enough information to process correctly so bail.
o.mu.Unlock()
return
}
// We need to account for consumers with ack floor of zero.
if state.AckFloor.Stream == 0 {
zeroAcks = append(zeroAcks, o)
} else if state.AckFloor.Stream < lowAckFloor {
lowAckFloor = state.AckFloor.Stream
}
// We are a follower here but if we detect a drift from when we were previous leader correct here.
if o.asflr > state.AckFloor.Stream || o.sseq > state.Delivered.Stream+1 {
o.applyState(state)
}
}
o.mu.Unlock()
}
// If nothing was set we can bail.
if lowAckFloor == math.MaxUint64 {
return
}
// Capture our current state.
// ok to do so without lock.
var state StreamState
mset.store.FastState(&state)
if lowAckFloor <= state.FirstSeq {
return
}
// Do not want to hold stream lock if calculating numPending.
// Check if we had any zeroAcks, we will need to check them.
for _, o := range zeroAcks {
var np uint64
o.mu.RLock()
if o.isLeader() {
np = uint64(o.numPending())
} else {
np, _ = o.calculateNumPending()
}
o.mu.RUnlock()
// This means we have pending and can not remove anything at this time.
if np > 0 {
return
}
}
mset.mu.Lock()
defer mset.mu.Unlock()
// Check which purge we need to perform.
if lowAckFloor <= state.LastSeq || state.Msgs == 0 {
// Purge the stream to lowest ack floor + 1
mset.store.PurgeEx(_EMPTY_, lowAckFloor+1, 0)
} else {
// Here we have a low ack floor higher then our last seq.
// So we will just do normal purge.
mset.store.Purge()
}
// Make sure to reset our local lseq.
mset.store.FastState(&state)
mset.lseq = state.LastSeq
// Also make sure we clear any pending acks.
mset.clearAllPreAcksBelowFloor(state.FirstSeq)
}
func (mset *stream) isInterestRetention() bool {

View File

@@ -51,9 +51,9 @@ func (n *node48) isFull() bool { return n.size >= 48 }
func (n *node48) grow() node {
nn := newNode256(n.prefix)
for c := byte(0); c < 255; c++ {
if i := n.key[c]; i > 0 {
nn.addChild(c, n.child[i-1])
for c := 0; c < len(n.key); c++ {
if i := n.key[byte(c)]; i > 0 {
nn.addChild(byte(c), n.child[i-1])
}
}
return nn
@@ -69,9 +69,9 @@ func (n *node48) deleteChild(c byte) {
last := byte(n.size - 1)
if i < last {
n.child[i] = n.child[last]
for c := byte(0); c <= 255; c++ {
if n.key[c] == last+1 {
n.key[c] = i + 1
for ic := 0; ic < len(n.key); ic++ {
if n.key[byte(ic)] == last+1 {
n.key[byte(ic)] = i + 1
break
}
}
@@ -87,9 +87,9 @@ func (n *node48) shrink() node {
return nil
}
nn := newNode16(nil)
for c := byte(0); c < 255; c++ {
if i := n.key[c]; i > 0 {
nn.addChild(c, n.child[i-1])
for c := 0; c < len(n.key); c++ {
if i := n.key[byte(c)]; i > 0 {
nn.addChild(byte(c), n.child[i-1])
}
}
return nn

View File

@@ -15,7 +15,7 @@ package stree
import (
"bytes"
"sort"
"slices"
)
// SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects.
@@ -382,7 +382,7 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T
}
}
// Now sort.
sort.SliceStable(nodes, func(i, j int) bool { return bytes.Compare(nodes[i].path(), nodes[j].path()) < 0 })
slices.SortStableFunc(nodes, func(a, b node) int { return bytes.Compare(a.path(), b.path()) })
// Now walk the nodes in order and call into next iter.
for i := range nodes {
if !t.iter(nodes[i], pre, cb) {

View File

@@ -527,7 +527,13 @@ var emptyResult = &SublistResult{}
// Match will match all entries to the literal subject.
// It will return a set of results for both normal and queue subscribers.
func (s *Sublist) Match(subject string) *SublistResult {
return s.match(subject, true)
return s.match(subject, true, false)
}
// MatchBytes will match all entries to the literal subject.
// It will return a set of results for both normal and queue subscribers.
func (s *Sublist) MatchBytes(subject []byte) *SublistResult {
return s.match(bytesToString(subject), true, true)
}
// HasInterest will return whether or not there is any interest in the subject.
@@ -537,10 +543,10 @@ func (s *Sublist) HasInterest(subject string) bool {
}
func (s *Sublist) matchNoLock(subject string) *SublistResult {
return s.match(subject, false)
return s.match(subject, false, false)
}
func (s *Sublist) match(subject string, doLock bool) *SublistResult {
func (s *Sublist) match(subject string, doLock bool, doCopyOnCache bool) *SublistResult {
atomic.AddUint64(&s.matches, 1)
// Check cache first.
@@ -595,6 +601,9 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
result = emptyResult
}
if cacheEnabled {
if doCopyOnCache {
subject = copyString(subject)
}
s.cache[subject] = result
n = len(s.cache)
}

View File

@@ -17,10 +17,23 @@
package sysmem
import (
"syscall"
"unsafe"
"golang.org/x/sys/windows"
)
var winKernel32 = windows.NewLazySystemDLL("kernel32.dll")
var winGlobalMemoryStatusEx = winKernel32.NewProc("GlobalMemoryStatusEx")
func init() {
if err := winKernel32.Load(); err != nil {
panic(err)
}
if err := winGlobalMemoryStatusEx.Find(); err != nil {
panic(err)
}
}
// https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/ns-sysinfoapi-memorystatusex
type _memoryStatusEx struct {
dwLength uint32
@@ -30,16 +43,8 @@ type _memoryStatusEx struct {
}
func Memory() int64 {
kernel32, err := syscall.LoadDLL("kernel32.dll")
if err != nil {
return 0
}
globalMemoryStatusEx, err := kernel32.FindProc("GlobalMemoryStatusEx")
if err != nil {
return 0
}
msx := &_memoryStatusEx{dwLength: 64}
res, _, _ := globalMemoryStatusEx.Call(uintptr(unsafe.Pointer(msx)))
res, _, _ := winGlobalMemoryStatusEx.Call(uintptr(unsafe.Pointer(msx)))
if res == 0 {
return 0
}

View File

@@ -667,7 +667,7 @@ func (c *client) wsHandleProtocolError(message string) error {
buf := wsCreateCloseMessage(wsCloseStatusProtocolError, message)
c.wsEnqueueControlMessage(wsCloseMessage, buf)
nbPoolPut(buf) // wsEnqueueControlMessage has taken a copy.
return fmt.Errorf(message)
return errors.New(message)
}
// Create a close message with the given `status` and `body`.

4
vendor/golang.org/x/time/LICENSE generated vendored
View File

@@ -1,4 +1,4 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Copyright 2009 The Go Authors.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
@@ -10,7 +10,7 @@ notice, this list of conditions and the following disclaimer.
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
* Neither the name of Google LLC nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

6
vendor/modules.txt vendored
View File

@@ -1426,8 +1426,8 @@ github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.5.8
## explicit; go 1.18
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.10.18
## explicit; go 1.21
# github.com/nats-io/nats-server/v2 v2.10.20
## explicit; go 1.21.0
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand
github.com/nats-io/nats-server/v2/internal/ldap
@@ -2252,7 +2252,7 @@ golang.org/x/text/transform
golang.org/x/text/unicode/bidi
golang.org/x/text/unicode/norm
golang.org/x/text/width
# golang.org/x/time v0.5.0
# golang.org/x/time v0.6.0
## explicit; go 1.18
golang.org/x/time/rate
# golang.org/x/tools v0.24.0