diff --git a/go.mod b/go.mod index 78736c2436..69d806e389 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.2.1 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.10.18 + github.com/nats-io/nats-server/v2 v2.10.20 github.com/nats-io/nats.go v1.37.0 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 @@ -344,7 +344,7 @@ require ( go.uber.org/zap v1.23.0 // indirect golang.org/x/mod v0.20.0 // indirect golang.org/x/sys v0.24.0 // indirect - golang.org/x/time v0.5.0 // indirect + golang.org/x/time v0.6.0 // indirect golang.org/x/tools v0.24.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect diff --git a/go.sum b/go.sum index 54452e05ce..6b36001b5f 100644 --- a/go.sum +++ b/go.sum @@ -893,8 +893,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.18 h1:tRdZmBuWKVAFYtayqlBB2BuCHNGAQPvoQIXOKwU3WSM= -github.com/nats-io/nats-server/v2 v2.10.18/go.mod h1:97Qyg7YydD8blKlR8yBsUlPlWyZKjA7Bp5cl3MUE9K8= +github.com/nats-io/nats-server/v2 v2.10.20 h1:CXDTYNHeBiAKBTAIP2gjpgbWap2GhATnTLgP8etyvEI= +github.com/nats-io/nats-server/v2 v2.10.20/go.mod h1:hgcPnoUtMfxz1qVOvLZGurVypQ+Cg6GXVXjG53iHk+M= github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= @@ -1549,8 +1549,8 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= -golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go index 4b24903a0d..f9897034d2 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "cmp" "encoding/hex" "errors" "fmt" @@ -25,7 +26,7 @@ import ( "net/http" "net/textproto" "reflect" - "sort" + "slices" "strconv" "strings" "sync" @@ -349,10 +350,8 @@ func (a *Account) updateRemoteServer(m *AccountNumConns) []*client { // conservative and bit harsh here. Clients will reconnect if we over compensate. var clients []*client if mtce { - clients := a.getClientsLocked() - sort.Slice(clients, func(i, j int) bool { - return clients[i].start.After(clients[j].start) - }) + clients = a.getClientsLocked() + slices.SortFunc(clients, func(i, j *client) int { return -i.start.Compare(j.start) }) // reserve over := (len(a.clients) - int(a.sysclients) + int(a.nrclients)) - int(a.mconns) if over < len(clients) { clients = clients[:over] @@ -669,7 +668,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error { } dests = append(dests, &destination{tr, aw}) } - sort.Slice(dests, func(i, j int) bool { return dests[i].weight < dests[j].weight }) + slices.SortFunc(dests, func(i, j *destination) int { return cmp.Compare(i.weight, j.weight) }) var lw uint8 for _, d := range dests { @@ -1478,11 +1477,13 @@ func (a *Account) addServiceImportWithClaim(destination *Account, from, to strin } // Check if this introduces a cycle before proceeding. - if err := a.serviceImportFormsCycle(destination, from); err != nil { - return err + // From will be the mapped subject. + // If the 'to' has a wildcard make sure we pre-transform the 'from' before we check for cycles, e.g. '$1' + fromT := from + if subjectHasWildcard(to) { + fromT, _ = transformUntokenize(from) } - - if err := a.serviceImportFormsCycle(destination, to); err != nil { + if err := a.serviceImportFormsCycle(destination, fromT); err != nil { return err } @@ -1807,7 +1808,7 @@ func (a *Account) _checkForReverseEntry(reply string, si *serviceImport, checkIn // Note that if we are here reply has to be a literal subject. if checkInterest { // If interest still exists we can not clean these up yet. - if rr := a.sl.Match(reply); len(rr.psubs)+len(rr.qsubs) > 0 { + if a.sl.HasInterest(reply) { a.mu.RUnlock() return } @@ -1925,6 +1926,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im tr *subjectTransform err error ) + if subjectHasWildcard(to) { // If to and from match, then we use the published subject. if to == from { @@ -3564,9 +3566,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim // Sort if we are over the limit. if a.MaxTotalConnectionsReached() { - sort.Slice(clients, func(i, j int) bool { - return clients[i].start.After(clients[j].start) - }) + slices.SortFunc(clients, func(i, j *client) int { return -i.start.Compare(j.start) }) // sort in reverse order } // If JetStream is enabled for this server we will call into configJetStream for the account @@ -3774,7 +3774,7 @@ func fetchAccount(res AccountResolver, name string) (string, error) { if !nkeys.IsValidPublicAccountKey(name) { return _EMPTY_, fmt.Errorf("will only fetch valid account keys") } - return res.Fetch(name) + return res.Fetch(copyString(name)) } // AccountResolver interface. This is to fetch Account JWTs by public nkeys diff --git a/vendor/github.com/nats-io/nats-server/v2/server/auth.go b/vendor/github.com/nats-io/nats-server/v2/server/auth.go index 716ecbfb4d..0a9564f4d0 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/auth.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/auth.go @@ -857,33 +857,6 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au // If we have a jwt and a userClaim, make sure we have the Account, etc associated. // We need to look up the account. This will use an account resolver if one is present. if juc != nil { - allowedConnTypes, err := convertAllowedConnectionTypes(juc.AllowedConnectionTypes) - if err != nil { - // We got an error, which means some connection types were unknown. As long as - // a valid one is returned, we proceed with auth. If not, we have to reject. - // In other words, suppose that JWT allows "WEBSOCKET" in the array. No error - // is returned and allowedConnTypes will contain "WEBSOCKET" only. - // Client will be rejected if not a websocket client, or proceed with rest of - // auth if it is. - // Now suppose JWT allows "WEBSOCKET, MQTT" and say MQTT is not known by this - // server. In this case, allowedConnTypes would contain "WEBSOCKET" and we - // would get `err` indicating that "MQTT" is an unknown connection type. - // If a websocket client connects, it should still be allowed, since after all - // the admin wanted to allow websocket and mqtt connection types. - // However, say that the JWT only allows "MQTT" (and again suppose this server - // does not know about MQTT connection type), then since the allowedConnTypes - // map would be empty (no valid types found), and since empty means allow-all, - // then we should reject because the intent was to allow connections for this - // user only as an MQTT client. - c.Debugf("%v", err) - if len(allowedConnTypes) == 0 { - return false - } - } - if !c.connectionTypeAllowed(allowedConnTypes) { - c.Debugf("Connection type not allowed") - return false - } issuer := juc.Issuer if juc.IssuerAccount != _EMPTY_ { issuer = juc.IssuerAccount @@ -926,6 +899,36 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au c.Debugf("Account does not allow bearer tokens") return false } + // We check the allowed connection types, but only after processing + // of scoped signer (so that it updates `juc` with what is defined + // in the account. + allowedConnTypes, err := convertAllowedConnectionTypes(juc.AllowedConnectionTypes) + if err != nil { + // We got an error, which means some connection types were unknown. As long as + // a valid one is returned, we proceed with auth. If not, we have to reject. + // In other words, suppose that JWT allows "WEBSOCKET" in the array. No error + // is returned and allowedConnTypes will contain "WEBSOCKET" only. + // Client will be rejected if not a websocket client, or proceed with rest of + // auth if it is. + // Now suppose JWT allows "WEBSOCKET, MQTT" and say MQTT is not known by this + // server. In this case, allowedConnTypes would contain "WEBSOCKET" and we + // would get `err` indicating that "MQTT" is an unknown connection type. + // If a websocket client connects, it should still be allowed, since after all + // the admin wanted to allow websocket and mqtt connection types. + // However, say that the JWT only allows "MQTT" (and again suppose this server + // does not know about MQTT connection type), then since the allowedConnTypes + // map would be empty (no valid types found), and since empty means allow-all, + // then we should reject because the intent was to allow connections for this + // user only as an MQTT client. + c.Debugf("%v", err) + if len(allowedConnTypes) == 0 { + return false + } + } + if !c.connectionTypeAllowed(allowedConnTypes) { + c.Debugf("Connection type not allowed") + return false + } // skip validation of nonce when presented with a bearer token // FIXME: if BearerToken is only for WSS, need check for server with that port enabled if !juc.BearerToken { @@ -978,12 +981,12 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au deniedSub := []string{} for _, sub := range denyAllJs { if c.perms.pub.deny != nil { - if r := c.perms.pub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 { + if c.perms.pub.deny.HasInterest(sub) { deniedPub = append(deniedPub, sub) } } if c.perms.sub.deny != nil { - if r := c.perms.sub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 { + if c.perms.sub.deny.HasInterest(sub) { deniedSub = append(deniedSub, sub) } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/avl/seqset.go b/vendor/github.com/nats-io/nats-server/v2/server/avl/seqset.go index 8d40450215..0b6aca67cd 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/avl/seqset.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/avl/seqset.go @@ -14,10 +14,11 @@ package avl import ( + "cmp" "encoding/binary" "errors" "math/bits" - "sort" + "slices" ) // SequenceSet is a memory and encoding optimized set for storing unsigned ints. @@ -209,7 +210,7 @@ func Union(ssa ...*SequenceSet) *SequenceSet { return nil } // Sort so we can clone largest. - sort.Slice(ssa, func(i, j int) bool { return ssa[i].Size() > ssa[j].Size() }) + slices.SortFunc(ssa, func(i, j *SequenceSet) int { return -cmp.Compare(i.Size(), j.Size()) }) // reverse order ss := ssa[0].Clone() // Insert the rest through range call. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/certidp/certidp.go b/vendor/github.com/nats-io/nats-server/v2/server/certidp/certidp.go index c1d9678231..a26618577b 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/certidp/certidp.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/certidp/certidp.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -222,7 +222,7 @@ func CertOCSPEligible(link *ChainLink) bool { if link == nil || link.Leaf.Raw == nil || len(link.Leaf.Raw) == 0 { return false } - if link.Leaf.OCSPServer == nil || len(link.Leaf.OCSPServer) == 0 { + if len(link.Leaf.OCSPServer) == 0 { return false } urls := getWebEndpoints(link.Leaf.OCSPServer) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go b/vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go index 6e210f2b5d..ad6c2651bc 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,6 +15,7 @@ package certidp import ( "encoding/base64" + "errors" "fmt" "io" "net/http" @@ -26,7 +27,7 @@ import ( func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte, error) { if link == nil || link.Leaf == nil || link.Issuer == nil || opts == nil || log == nil { - return nil, fmt.Errorf(ErrInvalidChainlink) + return nil, errors.New(ErrInvalidChainlink) } timeout := time.Duration(opts.Timeout * float64(time.Second)) @@ -59,7 +60,7 @@ func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte, responders := *link.OCSPWebEndpoints if len(responders) == 0 { - return nil, fmt.Errorf(ErrNoAvailOCSPServers) + return nil, errors.New(ErrNoAvailOCSPServers) } var raw []byte diff --git a/vendor/github.com/nats-io/nats-server/v2/server/certstore/certstore_windows.go b/vendor/github.com/nats-io/nats-server/v2/server/certstore/certstore_windows.go index 57adc187ab..19b9567be7 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/certstore/certstore_windows.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/certstore/certstore_windows.go @@ -115,19 +115,38 @@ var ( winMyStore = winWide("MY") // These DLLs must be available on all Windows hosts - winCrypt32 = windows.MustLoadDLL("crypt32.dll") - winNCrypt = windows.MustLoadDLL("ncrypt.dll") + winCrypt32 = windows.NewLazySystemDLL("crypt32.dll") + winNCrypt = windows.NewLazySystemDLL("ncrypt.dll") - winCertFindCertificateInStore = winCrypt32.MustFindProc("CertFindCertificateInStore") - winCryptAcquireCertificatePrivateKey = winCrypt32.MustFindProc("CryptAcquireCertificatePrivateKey") - winNCryptExportKey = winNCrypt.MustFindProc("NCryptExportKey") - winNCryptOpenStorageProvider = winNCrypt.MustFindProc("NCryptOpenStorageProvider") - winNCryptGetProperty = winNCrypt.MustFindProc("NCryptGetProperty") - winNCryptSignHash = winNCrypt.MustFindProc("NCryptSignHash") + winCertFindCertificateInStore = winCrypt32.NewProc("CertFindCertificateInStore") + winCryptAcquireCertificatePrivateKey = winCrypt32.NewProc("CryptAcquireCertificatePrivateKey") + winNCryptExportKey = winNCrypt.NewProc("NCryptExportKey") + winNCryptOpenStorageProvider = winNCrypt.NewProc("NCryptOpenStorageProvider") + winNCryptGetProperty = winNCrypt.NewProc("NCryptGetProperty") + winNCryptSignHash = winNCrypt.NewProc("NCryptSignHash") winFnGetProperty = winGetProperty ) +func init() { + for _, d := range []*windows.LazyDLL{ + winCrypt32, winNCrypt, + } { + if err := d.Load(); err != nil { + panic(err) + } + } + for _, p := range []*windows.LazyProc{ + winCertFindCertificateInStore, winCryptAcquireCertificatePrivateKey, + winNCryptExportKey, winNCryptOpenStorageProvider, + winNCryptGetProperty, winNCryptSignHash, + } { + if err := p.Find(); err != nil { + panic(err) + } + } +} + type winPKCS1PaddingInfo struct { pszAlgID *uint16 } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go index 99134bd0c5..fa0b445d27 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -312,6 +312,8 @@ type outbound struct { cw *s2.Writer } +const nbMaxVectorSize = 1024 // == IOV_MAX on Linux/Darwin and most other Unices (except Solaris/AIX) + const nbPoolSizeSmall = 512 // Underlying array size of small buffer const nbPoolSizeMedium = 4096 // Underlying array size of medium buffer const nbPoolSizeLarge = 65536 // Underlying array size of large buffer @@ -1611,7 +1613,7 @@ func (c *client) flushOutbound() bool { // referenced in c.out.nb (which can be modified in queueOutboud() while // the lock is released). c.out.wnb = append(c.out.wnb, collapsed...) - var _orig [1024][]byte + var _orig [nbMaxVectorSize][]byte orig := append(_orig[:0], c.out.wnb...) // Since WriteTo is lopping things off the beginning, we need to remember @@ -1622,13 +1624,31 @@ func (c *client) flushOutbound() bool { // flush here start := time.Now() - // FIXME(dlc) - writev will do multiple IOs past 1024 on - // most platforms, need to account for that with deadline? - nc.SetWriteDeadline(start.Add(wdl)) + var n int64 // Total bytes written + var wn int64 // Bytes written per loop + var err error // Error from last write, if any + for len(c.out.wnb) > 0 { + // Limit the number of vectors to no more than nbMaxVectorSize, + // which if 1024, will mean a maximum of 64MB in one go. + wnb := c.out.wnb + if len(wnb) > nbMaxVectorSize { + wnb = wnb[:nbMaxVectorSize] + } + consumed := len(wnb) - // Actual write to the socket. - n, err := c.out.wnb.WriteTo(nc) - nc.SetWriteDeadline(time.Time{}) + // Actual write to the socket. + nc.SetWriteDeadline(start.Add(wdl)) + wn, err = wnb.WriteTo(nc) + nc.SetWriteDeadline(time.Time{}) + + // Update accounting, move wnb slice onwards if needed, or stop + // if a write error was reported that wasn't a short write. + n += wn + c.out.wnb = c.out.wnb[consumed-len(wnb):] + if err != nil && err != io.ErrShortWrite { + break + } + } lft := time.Since(start) @@ -1810,7 +1830,9 @@ func (c *client) markConnAsClosed(reason ClosedState) { if nc := c.nc; nc != nil && c.srv != nil { // TODO: May want to send events to single go routine instead // of creating a new go routine for each save. - go c.srv.saveClosedClient(c, nc, reason) + // Pass the c.subs as a reference. It may be set to nil in + // closeConnection. + go c.srv.saveClosedClient(c, nc, c.subs, reason) } } // If writeLoop exists, let it do the final flush, close and teardown. @@ -2950,7 +2972,7 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri if err := im.acc.sl.Insert(&nsub); err != nil { errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name) c.Debugf(errs) - return nil, fmt.Errorf(errs) + return nil, errors.New(errs) } // Update our route map here. But only if we are not a leaf node or a hub leafnode. @@ -3886,9 +3908,8 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { // Match may use the subject here to populate a cache, so can not use bytesToString here. r = acc.sl.Match(string(c.pa.subject)) if len(r.psubs)+len(r.qsubs) > 0 { - c.in.results[string(c.pa.subject)] = r // Prune the results cache. Keeps us from unbounded growth. Random delete. - if len(c.in.results) > maxResultCacheSize { + if len(c.in.results) >= maxResultCacheSize { n := 0 for subject := range c.in.results { delete(c.in.results, subject) @@ -3897,6 +3918,8 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { } } } + // Then add the new cache entry. + c.in.results[string(c.pa.subject)] = r } } @@ -3964,7 +3987,7 @@ func (c *client) subForReply(reply []byte) *subscription { func (c *client) handleGWReplyMap(msg []byte) bool { // Check for leaf nodes if c.srv.gwLeafSubs.Count() > 0 { - if r := c.srv.gwLeafSubs.Match(string(c.pa.subject)); len(r.psubs) > 0 { + if r := c.srv.gwLeafSubs.MatchBytes(c.pa.subject); len(r.psubs) > 0 { c.processMsgResults(c.acc, r, msg, c.pa.deliver, c.pa.subject, c.pa.reply, pmrNoFlag) } } @@ -5284,6 +5307,14 @@ func (c *client) closeConnection(reason ClosedState) { } } + // Now that we are done with subscriptions, clear the field so that the + // connection can be released and gc'ed. + if kind == CLIENT || kind == LEAF { + c.mu.Lock() + c.subs = nil + c.mu.Unlock() + } + // Don't reconnect connections that have been marked with // the no reconnect flag. if noReconnect { @@ -5436,17 +5467,19 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { if !ok { if c.kind == ROUTER && len(c.route.accName) > 0 { - acc = c.acc + if acc = c.acc; acc == nil { + return nil, nil + } } else { // Match correct account and sublist. - if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil { + if acc, _ = c.srv.LookupAccount(bytesToString(c.pa.account)); acc == nil { return nil, nil } } sl := acc.sl // Match against the account sublist. - r = sl.Match(string(c.pa.subject)) + r = sl.MatchBytes(c.pa.subject) // Check if we need to prune. if len(c.in.pacache) >= maxPerAccountCacheSize { @@ -5824,6 +5857,14 @@ func (c *client) Warnf(format string, v ...any) { c.srv.Warnf(format, v...) } +func (c *client) rateLimitFormatWarnf(format string, v ...any) { + if _, loaded := c.srv.rateLimitLogging.LoadOrStore(format, time.Now()); loaded { + return + } + statement := fmt.Sprintf(format, v...) + c.Warnf("%s", statement) +} + func (c *client) RateLimitWarnf(format string, v ...any) { // Do the check before adding the client info to the format... statement := fmt.Sprintf(format, v...) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index 0a60902657..ea43ac4f02 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -55,7 +55,7 @@ func init() { const ( // VERSION is the current version for the server. - VERSION = "2.10.18" + VERSION = "2.10.20" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index f797073275..38af7da959 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -21,7 +21,7 @@ import ( "fmt" "math/rand" "reflect" - "sort" + "slices" "strconv" "strings" "sync" @@ -712,7 +712,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri mset.mu.RLock() s, jsa, cfg, acc := mset.srv, mset.jsa, mset.cfg, mset.acc - retention := cfg.Retention mset.mu.RUnlock() // If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn. @@ -735,7 +734,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // Make sure we have sane defaults. Do so with the JS lock, otherwise a // badly timed meta snapshot can result in a race condition. mset.js.mu.Lock() - setConsumerConfigDefaults(config, &mset.cfg, srvLim, selectedLimits) + setConsumerConfigDefaults(config, &cfg, srvLim, selectedLimits) mset.js.mu.Unlock() if err := checkConsumerCfg(config, srvLim, &cfg, acc, selectedLimits, isRecovering); err != nil { @@ -781,7 +780,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerAlreadyExistsError() } // Check for overlapping subjects if we are a workqueue - if mset.cfg.Retention == WorkQueuePolicy { + if cfg.Retention == WorkQueuePolicy { subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) if !mset.partitionUnique(cName, subjects) { return nil, NewJSConsumerWQConsumerNotUniqueError() @@ -803,7 +802,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // but if not we use the value from account limits, if account limits is more restrictive // than stream config we prefer the account limits to handle cases where account limits are // updated during the lifecycle of the stream - maxc := mset.cfg.MaxConsumers + maxc := cfg.MaxConsumers if maxc <= 0 || (selectedLimits.MaxConsumers > 0 && selectedLimits.MaxConsumers < maxc) { maxc = selectedLimits.MaxConsumers } @@ -813,7 +812,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } // Check on stream type conflicts with WorkQueues. - if mset.cfg.Retention == WorkQueuePolicy && !config.Direct { + if cfg.Retention == WorkQueuePolicy && !config.Direct { // Force explicit acks here. if config.AckPolicy != AckExplicit { mset.mu.Unlock() @@ -871,7 +870,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri sfreq: int32(sampleFreq), maxdc: uint64(config.MaxDeliver), maxp: config.MaxAckPending, - retention: retention, + retention: cfg.Retention, created: time.Now().UTC(), } @@ -904,17 +903,17 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } // Create ackMsgs queue now that we have a consumer name - o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name)) + o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, cfg.Name)) // Create our request waiting queue. if o.isPullMode() { o.waiting = newWaitQueue(config.MaxWaiting) // Create our internal queue for next msg requests. - o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, mset.cfg.Name)) + o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, cfg.Name)) } // already under lock, mset.Name() would deadlock - o.stream = mset.cfg.Name + o.stream = cfg.Name o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name o.nakEventT = JSAdvisoryConsumerMsgNakPre + "." + o.stream + "." + o.name o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name @@ -999,7 +998,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // that to scanf them back in. // Escape '%' in consumer and stream names, as `pre` is used as a template later // in consumer.ackReply(), resulting in erroneous formatting of the ack subject. - mn := strings.ReplaceAll(mset.cfg.Name, "%", "%%") + mn := strings.ReplaceAll(cfg.Name, "%", "%%") pre := fmt.Sprintf(jsAckT, mn, strings.ReplaceAll(o.name, "%", "%%")) o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre) o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre) @@ -1011,9 +1010,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if o.isPushMode() { // Check if we are running only 1 replica and that the delivery subject has interest. // Check in place here for interest. Will setup properly in setLeader. - if config.replicas(&mset.cfg) == 1 { - r := o.acc.sl.Match(o.cfg.DeliverSubject) - if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) { + if config.replicas(&cfg) == 1 { + interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject) + if !o.hasDeliveryInterest(interest) { // Let the interest come to us eventually, but setup delete timer. o.updateDeliveryInterest(false) } @@ -1193,7 +1192,7 @@ func (o *consumer) setLeader(isLeader bool) { } mset.mu.RLock() - s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq + s, jsa, stream, lseq := mset.srv, mset.jsa, mset.getCfgName(), mset.lseq mset.mu.RUnlock() o.mu.Lock() @@ -1508,7 +1507,7 @@ func (s *Server) hasGatewayInterest(account, subject string) bool { gw.RLock() defer gw.RUnlock() for _, gwc := range gw.outo { - psi, qr := gwc.gatewayInterest(account, subject) + psi, qr := gwc.gatewayInterest(account, stringToBytes(subject)) if psi || qr != nil { return true } @@ -1697,7 +1696,7 @@ func (o *consumer) forceExpirePending() { } } if len(expired) > 0 { - sort.Slice(expired, func(i, j int) bool { return expired[i] < expired[j] }) + slices.Sort(expired) o.addToRedeliverQueue(expired...) // Now we should update the timestamp here since we are redelivering. // We will use an incrementing time to preserve order for any other redelivery. @@ -1745,6 +1744,8 @@ func (o *consumer) setRateLimit(bps uint64) { // Burst should be set to maximum msg size for this account, etc. var burst int + // We don't need to get cfgMu's rlock here since this function + // is already invoked under mset.mu.RLock(), which superseeds cfgMu. if mset.cfg.MaxMsgSize > 0 { burst = int(mset.cfg.MaxMsgSize) } else if mset.jsa.account.limits.mpay > 0 { @@ -1802,8 +1803,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error { if ncfg.DeliverSubject == _EMPTY_ { return errors.New("can not update push consumer to pull based") } - rr := acc.sl.Match(cfg.DeliverSubject) - if len(rr.psubs)+len(rr.qsubs) != 0 { + if acc.sl.HasInterest(cfg.DeliverSubject) { return NewJSConsumerNameExistError() } } @@ -2028,9 +2028,10 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): - o.processAckMsg(sseq, dseq, dc, reply, true) - // We handle replies for acks in updateAcks - skipAckReply = true + if !o.processAckMsg(sseq, dseq, dc, reply, true) { + // We handle replies for acks in updateAcks + skipAckReply = true + } case bytes.HasPrefix(msg, AckNext): o.processAckMsg(sseq, dseq, dc, _EMPTY_, true) o.processNextMsgRequest(reply, msg[len(AckNext):]) @@ -2044,9 +2045,10 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { if buf := msg[len(AckTerm):]; len(buf) > 0 { reason = string(bytes.TrimSpace(buf)) } - o.processTerm(sseq, dseq, dc, reason, reply) - // We handle replies for acks in updateAcks - skipAckReply = true + if !o.processTerm(sseq, dseq, dc, reason, reply) { + // We handle replies for acks in updateAcks + skipAckReply = true + } } // Ack the ack if requested. @@ -2396,9 +2398,12 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { } // Process a TERM -func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) { +// Returns `true` if the ack was processed in place and the sender can now respond +// to the client, or `false` if there was an error or the ack is replicated (in which +// case the reply will be sent later). +func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool { // Treat like an ack to suppress redelivery. - o.processAckMsg(sseq, dseq, dc, reply, false) + ackedInPlace := o.processAckMsg(sseq, dseq, dc, reply, false) o.mu.Lock() defer o.mu.Unlock() @@ -2421,11 +2426,14 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) { j, err := json.Marshal(e) if err != nil { - return + // We had an error during the marshal, so we can't send the advisory, + // but we still need to tell the caller that the ack was processed. + return ackedInPlace } subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name o.sendAdvisory(subj, j) + return ackedInPlace } // Introduce a small delay in when timer fires to check pending. @@ -2458,7 +2466,7 @@ func (o *consumer) checkRedelivered(slseq uint64) { if shouldUpdateState { if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.cfg.Name, name, err) + s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.getCfgName(), name, err) } } } @@ -2734,11 +2742,15 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { o.sendAdvisory(o.ackEventT, j) } -func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) { +// Process an ACK. +// Returns `true` if the ack was processed in place and the sender can now respond +// to the client, or `false` if there was an error or the ack is replicated (in which +// case the reply will be sent later). +func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) bool { o.mu.Lock() if o.closed { o.mu.Unlock() - return + return false } // Check if this ack is above the current pointer to our next to deliver. @@ -2750,9 +2762,14 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b mset := o.mset if mset == nil || mset.closed.Load() { o.mu.Unlock() - return + return false } + // Let the owning stream know if we are interest or workqueue retention based. + // If this consumer is clustered (o.node != nil) this will be handled by + // processReplicatedAck after the ack has propagated. + ackInPlace := o.node == nil && o.retention != LimitsPolicy + var sgap, floor uint64 var needSignal bool @@ -2791,7 +2808,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() - return + return ackInPlace } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true @@ -2823,22 +2840,19 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b case AckNone: // FIXME(dlc) - This is error but do we care? o.mu.Unlock() - return + return ackInPlace } + // No ack replication, so we set reply to "" so that updateAcks does not + // send the reply. The caller will. + if ackInPlace { + reply = _EMPTY_ + } // Update underlying store. o.updateAcks(dseq, sseq, reply) - - // In case retention changes for a stream, this ought to have been updated - // using the consumer lock to avoid a race. - retention := o.retention - clustered := o.node != nil o.mu.Unlock() - // Let the owning stream know if we are interest or workqueue retention based. - // If this consumer is clustered this will be handled by processReplicatedAck - // after the ack has propagated. - if !clustered && mset != nil && retention != LimitsPolicy { + if ackInPlace { if sgap > 1 { // FIXME(dlc) - This can very inefficient, will need to fix. for seq := sseq; seq >= floor; seq-- { @@ -2853,6 +2867,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b if needSignal { o.signalNewMessages() } + return ackInPlace } // Determine if this is a truly filtered consumer. Modern clients will place filtered subjects @@ -2869,16 +2884,21 @@ func (o *consumer) isFiltered() bool { return true } + // Protect access to mset.cfg with the cfgMu mutex. + mset.cfgMu.RLock() + msetSubjects := mset.cfg.Subjects + mset.cfgMu.RUnlock() + // `isFiltered` need to be performant, so we do // as any checks as possible to avoid unnecessary work. // Here we avoid iteration over slices if there is only one subject in stream // and one filter for the consumer. - if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 { - return mset.cfg.Subjects[0] != o.subjf[0].subject + if len(msetSubjects) == 1 && len(o.subjf) == 1 { + return msetSubjects[0] != o.subjf[0].subject } // if the list is not equal length, we can return early, as this is filtered. - if len(mset.cfg.Subjects) != len(o.subjf) { + if len(msetSubjects) != len(o.subjf) { return true } @@ -2890,7 +2910,7 @@ func (o *consumer) isFiltered() bool { for _, val := range o.subjf { cfilters[val.subject] = struct{}{} } - for _, val := range mset.cfg.Subjects { + for _, val := range msetSubjects { if _, ok := cfilters[val]; !ok { return true } @@ -2898,6 +2918,28 @@ func (o *consumer) isFiltered() bool { return false } +// Check if we would have matched and needed an ack for this store seq. +// This is called for interest based retention streams to remove messages. +func (o *consumer) matchAck(sseq uint64) bool { + o.mu.RLock() + defer o.mu.RUnlock() + + // Check if we are filtered, and if so check if this is even applicable to us. + if o.isFiltered() { + if o.mset == nil { + return false + } + var svp StoreMsg + if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { + return false + } + if !o.isFilteredMatch(svp.subj) { + return false + } + } + return true +} + // Check if we need an ack for this store seq. // This is called for interest based retention streams to remove messages. func (o *consumer) needAck(sseq uint64, subj string) bool { @@ -3199,8 +3241,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest { } if wr.expires.IsZero() || time.Now().Before(wr.expires) { - rr := wr.acc.sl.Match(wr.interest) - if len(rr.psubs)+len(rr.qsubs) > 0 { + if wr.acc.sl.HasInterest(wr.interest) { return o.waiting.pop() } else if time.Since(wr.received) < defaultGatewayRecentSubExpiration && (o.srv.leafNodeEnabled || o.srv.gateway.enabled) { return o.waiting.pop() @@ -3627,8 +3668,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { continue } // Now check interest. - rr := wr.acc.sl.Match(wr.interest) - interest := len(rr.psubs)+len(rr.qsubs) > 0 + interest := wr.acc.sl.HasInterest(wr.interest) if !interest && (s.leafNodeEnabled || s.gateway.enabled) { // If we are here check on gateways and leaf nodes (as they can mask gateways on the other end). // If we have interest or the request is too young break and do not expire. @@ -3907,10 +3947,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { inch := o.inch o.mu.Unlock() - // Grab the stream's retention policy - mset.mu.RLock() - rp := mset.cfg.Retention - mset.mu.RUnlock() + // Grab the stream's retention policy and name + mset.cfgMu.RLock() + stream, rp := mset.cfg.Name, mset.cfg.Retention + mset.cfgMu.RUnlock() var err error @@ -3966,12 +4006,12 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { goto waitForMsgs } else if err == errPartialCache { s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'", - o.mset.acc, o.mset.cfg.Name, o.cfg.Name) + o.mset.acc, stream, o.cfg.Name) goto waitForMsgs } else { s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v", - o.mset.acc, o.mset.cfg.Name, o.cfg.Name, err) + o.mset.acc, stream, o.cfg.Name, err) goto waitForMsgs } } @@ -4622,7 +4662,7 @@ func (o *consumer) checkPending() { if len(expired) > 0 { // We need to sort. - sort.Slice(expired, func(i, j int) bool { return expired[i] < expired[j] }) + slices.Sort(expired) o.addToRedeliverQueue(expired...) // Now we should update the timestamp here since we are redelivering. // We will use an incrementing time to preserve order for any other redelivery. @@ -4656,7 +4696,7 @@ func (o *consumer) checkPending() { if shouldUpdateState { if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.cfg.Name, name, err) + s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.getCfgName(), name, err) } } } @@ -4777,7 +4817,10 @@ func (o *consumer) selectStartingSeqNo() { } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { // If our parent stream is set to max msgs per subject of 1 this is just // a normal consumer at this point. We can avoid any heavy lifting. - if o.mset.cfg.MaxMsgsPer == 1 { + o.mset.cfgMu.RLock() + mmp := o.mset.cfg.MaxMsgsPer + o.mset.cfgMu.RUnlock() + if mmp == 1 { o.sseq = state.FirstSeq } else { // A threshold for when we switch from get last msg to subjects state. @@ -4807,9 +4850,7 @@ func (o *consumer) selectStartingSeqNo() { } // Sort the skip list if needed. if len(lss.seqs) > 1 { - sort.Slice(lss.seqs, func(i, j int) bool { - return lss.seqs[j] > lss.seqs[i] - }) + slices.Sort(lss.seqs) } if len(lss.seqs) == 0 { o.sseq = state.LastSeq @@ -4848,12 +4889,16 @@ func (o *consumer) selectStartingSeqNo() { o.sseq = o.cfg.OptStartSeq } - if state.FirstSeq == 0 { - o.sseq = 1 - } else if o.sseq < state.FirstSeq { - o.sseq = state.FirstSeq - } else if o.sseq > state.LastSeq { - o.sseq = state.LastSeq + 1 + // Only clip the sseq if the OptStartSeq is not provided, otherwise + // it's possible that the stream just doesn't contain OptStartSeq yet. + if o.cfg.OptStartSeq == 0 { + if state.FirstSeq == 0 { + o.sseq = 1 + } else if o.sseq < state.FirstSeq { + o.sseq = state.FirstSeq + } else if o.sseq > state.LastSeq { + o.sseq = state.LastSeq + 1 + } } } @@ -4932,9 +4977,9 @@ func (o *consumer) isActive() bool { // hasNoLocalInterest return true if we have no local interest. func (o *consumer) hasNoLocalInterest() bool { o.mu.RLock() - rr := o.acc.sl.Match(o.cfg.DeliverSubject) + interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject) o.mu.RUnlock() - return len(rr.psubs)+len(rr.qsubs) == 0 + return !interest } // This is when the underlying stream has been purged. @@ -5168,6 +5213,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { if mset != nil { mset.mu.Lock() mset.removeConsumer(o) + // No need for cfgMu's lock since mset.mu.Lock superseeds it. rp = mset.cfg.Retention mset.mu.Unlock() } @@ -5298,13 +5344,13 @@ func (o *consumer) switchToEphemeral() { o.mu.Lock() o.cfg.Durable = _EMPTY_ store, ok := o.store.(*consumerFileStore) - rr := o.acc.sl.Match(o.cfg.DeliverSubject) + interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject) // Setup dthresh. o.updateInactiveThreshold(&o.cfg) o.mu.Unlock() // Update interest - o.updateDeliveryInterest(len(rr.psubs)+len(rr.qsubs) > 0) + o.updateDeliveryInterest(interest) // Write out new config if ok { store.updateConfig(o.cfg) @@ -5499,7 +5545,9 @@ func (o *consumer) checkStateForInterestStream() error { } for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ { - mset.ackMsg(o, seq) + if o.matchAck(seq) { + mset.ackMsg(o, seq) + } } o.mu.RLock() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/errors.go b/vendor/github.com/nats-io/nats-server/v2/server/errors.go index 8efa7ac02e..c096bbef92 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/errors.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/errors.go @@ -180,6 +180,9 @@ var ( // ErrClusterNameRemoteConflict signals that a remote server has a different cluster name. ErrClusterNameRemoteConflict = errors.New("cluster name from remote server conflicts") + // ErrClusterNameHasSpaces signals that the cluster name contains spaces, which is not allowed. + ErrClusterNameHasSpaces = errors.New("cluster name cannot contain spaces or new lines") + // ErrMalformedSubject is returned when a subscription is made with a subject that does not conform to subject rules. ErrMalformedSubject = errors.New("malformed subject") diff --git a/vendor/github.com/nats-io/nats-server/v2/server/events.go b/vendor/github.com/nats-io/nats-server/v2/server/events.go index 2ebfc5ebac..39de871f98 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/events.go @@ -130,6 +130,7 @@ type internal struct { replies map[string]msgHandler sendq *ipQueue[*pubMsg] recvq *ipQueue[*inSysMsg] + recvqp *ipQueue[*inSysMsg] // For STATSZ/Pings resetCh chan struct{} wg sync.WaitGroup sq *sendq @@ -412,15 +413,7 @@ type TypedEvent struct { // internalReceiveLoop will be responsible for dispatching all messages that // a server receives and needs to internally process, e.g. internal subs. -func (s *Server) internalReceiveLoop() { - s.mu.RLock() - if s.sys == nil || s.sys.recvq == nil { - s.mu.RUnlock() - return - } - recvq := s.sys.recvq - s.mu.RUnlock() - +func (s *Server) internalReceiveLoop(recvq *ipQueue[*inSysMsg]) { for s.eventsRunning() { select { case <-recvq.ch: @@ -1139,7 +1132,7 @@ func (s *Server) initEventTracking() { } // Listen for ping messages that will be sent to all servers for statsz. // This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below - if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil { + if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallbackStatsz(s.statszReq)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) return } @@ -1182,23 +1175,42 @@ func (s *Server) initEventTracking() { optz := &HealthzEventOptions{} s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.healthz(&optz.HealthzOptions), nil }) }, - "PROFILEZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { - optz := &ProfilezEventOptions{} - s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.profilez(&optz.ProfilezOptions), nil }) - }, + "PROFILEZ": nil, // Special case, see below "EXPVARZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &ExpvarzEventOptions{} s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.expvarz(optz), nil }) }, } + profilez := func(_ *subscription, c *client, _ *Account, _, rply string, rmsg []byte) { + hdr, msg := c.msgParts(rmsg) + // Need to copy since we are passing those to the go routine below. + hdr, msg = copyBytes(hdr), copyBytes(msg) + // Execute in its own go routine because CPU profiling, for instance, + // could take several seconds to complete. + go func() { + optz := &ProfilezEventOptions{} + s.zReq(c, rply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { + return s.profilez(&optz.ProfilezOptions), nil + }) + }() + } for name, req := range monSrvc { + var h msgHandler + switch name { + case "PROFILEZ": + h = profilez + case "STATSZ": + h = s.noInlineCallbackStatsz(req) + default: + h = s.noInlineCallback(req) + } subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name) - if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { + if _, err := s.sysSubscribe(subject, h); err != nil { s.Errorf("Error setting up internal tracking: %v", err) return } subject = fmt.Sprintf(serverPingReqSubj, name) - if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { + if _, err := s.sysSubscribe(subject, h); err != nil { s.Errorf("Error setting up internal tracking: %v", err) return } @@ -2451,16 +2463,39 @@ func (s *Server) sendAccountAuthErrorEvent(c *client, acc *Account, reason strin // rmsg contains header and the message. use client.msgParts(rmsg) to split them apart type msgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, rmsg []byte) +const ( + recvQMuxed = 1 + recvQStatsz = 2 +) + // Create a wrapped callback handler for the subscription that will move it to an // internal recvQ for processing not inline with routes etc. func (s *Server) noInlineCallback(cb sysMsgHandler) msgHandler { + return s.noInlineCallbackRecvQSelect(cb, recvQMuxed) +} + +// Create a wrapped callback handler for the subscription that will move it to an +// internal recvQ for Statsz/Pings for processing not inline with routes etc. +func (s *Server) noInlineCallbackStatsz(cb sysMsgHandler) msgHandler { + return s.noInlineCallbackRecvQSelect(cb, recvQStatsz) +} + +// Create a wrapped callback handler for the subscription that will move it to an +// internal IPQueue for processing not inline with routes etc. +func (s *Server) noInlineCallbackRecvQSelect(cb sysMsgHandler, recvQSelect int) msgHandler { s.mu.RLock() if !s.eventsEnabled() { s.mu.RUnlock() return nil } // Capture here for direct reference to avoid any unnecessary blocking inline with routes, gateways etc. - recvq := s.sys.recvq + var recvq *ipQueue[*inSysMsg] + switch recvQSelect { + case recvQStatsz: + recvq = s.sys.recvqp + default: + recvq = s.sys.recvq + } s.mu.RUnlock() return func(sub *subscription, c *client, acc *Account, subj, rply string, rmsg []byte) { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index e059d28bec..16d3ef05d4 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -27,10 +27,12 @@ import ( "fmt" "hash" "io" + "io/fs" "math" "net" "os" "path/filepath" + "slices" "sort" "strings" "sync" @@ -237,6 +239,7 @@ type msgBlock struct { noTrack bool needSync bool syncAlways bool + noCompact bool closed bool // Used to mock write failures. @@ -765,9 +768,7 @@ func (fs *fileStore) setupAEK() error { if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } - <-dios - err = os.WriteFile(keyFile, encrypted, defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms) if err != nil { return err } @@ -803,9 +804,7 @@ func (fs *fileStore) writeStreamMeta() error { b = fs.aek.Seal(nonce, nonce, b, nil) } - <-dios - err = os.WriteFile(meta, b, defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(meta, b, defaultFilePerms) if err != nil { return err } @@ -813,9 +812,7 @@ func (fs *fileStore) writeStreamMeta() error { fs.hh.Write(b) checksum := hex.EncodeToString(fs.hh.Sum(nil)) sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum) - <-dios - err = os.WriteFile(sum, []byte(checksum), defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms) if err != nil { return err } @@ -1074,7 +1071,7 @@ func (fs *fileStore) addLostData(ld *LostStreamData) { } if added { msgs := fs.ld.Msgs - sort.Slice(msgs, func(i, j int) bool { return msgs[i] < msgs[j] }) + slices.Sort(msgs) fs.ld.Bytes += ld.Bytes } } else { @@ -1084,17 +1081,10 @@ func (fs *fileStore) addLostData(ld *LostStreamData) { // Helper to see if we already have this sequence reported in our lost data. func (ld *LostStreamData) exists(seq uint64) (int, bool) { - i, found := sort.Find(len(ld.Msgs), func(i int) int { - tseq := ld.Msgs[i] - if tseq < seq { - return -1 - } - if tseq > seq { - return +1 - } - return 0 + i := slices.IndexFunc(ld.Msgs, func(i uint64) bool { + return i == seq }) - return i, found + return i, i > -1 } func (fs *fileStore) removeFromLostData(seq uint64) { @@ -1206,9 +1196,7 @@ func (mb *msgBlock) convertCipher() error { // the old keyfile back. if err := fs.genEncryptionKeysForBlock(mb); err != nil { keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)) - <-dios - os.WriteFile(keyFile, ekey, defaultFilePerms) - dios <- struct{}{} + fs.writeFileWithOptionalSync(keyFile, ekey, defaultFilePerms) return err } mb.bek.XORKeyStream(buf, buf) @@ -1249,6 +1237,13 @@ func (mb *msgBlock) convertToEncrypted() error { return nil } +// Return the mb's index. +func (mb *msgBlock) getIndex() uint32 { + mb.mu.RLock() + defer mb.mu.RUnlock() + return mb.index +} + // Rebuild the state of the blk based on what we have on disk in the N.blk file. // We will return any lost data, and we will return any delete tombstones we encountered. func (mb *msgBlock) rebuildState() (*LostStreamData, []uint64, error) { @@ -2607,9 +2602,14 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { return ss } -// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index. -// Will return -1 if no matches at all. -func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) { +// This is used to see if we can selectively jump start blocks based on filter subject and a starting block index. +// Will return -1 and ErrStoreEOF if no matches at all or no more from where we are. +func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool, bi int) (int, error) { + // If we match everything, just move to next blk. + if filter == _EMPTY_ || filter == fwcs { + return bi + 1, nil + } + // Move through psim to gather start and stop bounds. start, stop := uint32(math.MaxUint32), uint32(0) if wc { fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) { @@ -2623,51 +2623,26 @@ func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) { } else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok { start, stop = psi.fblk, psi.lblk } - // Nothing found. + // Nothing was found. if start == uint32(math.MaxUint32) { - return -1, -1 + return -1, ErrStoreEOF } - // Here we need to translate this to index into fs.blks properly. - mb := fs.bim[start] - if mb == nil { - // psim fblk can be lazy. - i := start + 1 - for ; i <= stop; i++ { - mb = fs.bim[i] - if mb == nil { - continue - } - if _, f, _ := mb.filteredPending(filter, wc, 0); f > 0 { - break - } - } - // Update fblk since fblk was outdated. - if !wc { - if psi, ok := fs.psim.Find(stringToBytes(filter)); ok { - psi.fblk = i - } - } else { - fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) { - if i > psi.fblk { - psi.fblk = i - } - }) + // Can not be nil so ok to inline dereference. + mbi := fs.blks[bi].getIndex() + // All matching msgs are behind us. + // Less than AND equal is important because we were called because we missed searching bi. + if stop <= mbi { + return -1, ErrStoreEOF + } + // If start is > index return dereference of fs.blks index. + if start > mbi { + if mb := fs.bim[start]; mb != nil { + ni, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq)) + return ni, nil } } - // Still nothing. - if mb == nil { - return -1, -1 - } - // Grab first index. - fi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq)) - - // Grab last if applicable. - var li int - if mb = fs.bim[stop]; mb != nil { - li, _ = fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq)) - } - - return fi, li + // Otherwise just bump to the next one. + return bi + 1, nil } // Optimized way for getting all num pending matching a filter subject. @@ -3283,9 +3258,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error { if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } - <-dios - err = os.WriteFile(keyFile, encrypted, defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms) if err != nil { return err } @@ -3987,6 +3960,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( mb.bytes = 0 } + // Allow us to check compaction again. + mb.noCompact = false + // Mark as dirty for stream state. fs.dirty++ @@ -4029,7 +4005,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // All other more thorough cleanup will happen in syncBlocks logic. // Note that we do not have to store empty records for the deleted, so don't use to calculate. // TODO(dlc) - This should not be inline, should kick the sync routine. - if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock { + if !isLastBlock && mb.shouldCompactInline() { mb.compact() } } @@ -4091,6 +4067,21 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( return true, nil } +// Tests whether we should try to compact this block while inline removing msgs. +// We will want rbytes to be over the minimum and have a 2x potential savings. +// Lock should be held. +func (mb *msgBlock) shouldCompactInline() bool { + return mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes +} + +// Tests whether we should try to compact this block while running periodic sync. +// We will want rbytes to be over the minimum and have a 2x potential savings. +// Ignores 2MB minimum. +// Lock should be held. +func (mb *msgBlock) shouldCompactSync() bool { + return mb.bytes*2 < mb.rbytes && !mb.noCompact +} + // This will compact and rewrite this block. This should only be called when we know we want to rewrite this block. // This should not be called on the lmb since we will prune tail deleted messages which could cause issues with // writing new messages. We will silently bail on any issues with the underlying block and let someone else detect. @@ -4197,7 +4188,12 @@ func (mb *msgBlock) compact() { mb.needSync = true // Capture the updated rbytes. - mb.rbytes = uint64(len(nbuf)) + if rbytes := uint64(len(nbuf)); rbytes == mb.rbytes { + // No change, so set our noCompact bool here to avoid attempting to continually compress in syncBlocks. + mb.noCompact = true + } else { + mb.rbytes = rbytes + } // Remove any seqs from the beginning of the blk. for seq, nfseq := fseq, atomic.LoadUint64(&mb.first.seq); seq < nfseq; seq++ { @@ -4984,6 +4980,9 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte } // Write index mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit) + } else { + // Make sure to account for tombstones in rbytes. + mb.rbytes += rl } fch, werr := mb.fch, mb.werr @@ -5327,7 +5326,7 @@ func (fs *fileStore) syncBlocks() { // Check if we should compact here as well. // Do not compact last mb. var needsCompact bool - if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes { + if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.shouldCompactSync() { needsCompact = true markDirty = true } @@ -6424,10 +6423,13 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store // Nothing found in this block. We missed, if first block (bi) check psim. // Similar to above if start <= first seq. // TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers. - if i == bi { - nbi, lbi := fs.checkSkipFirstBlock(filter, wc) + // We should not do this at all if we are already on the last block. + // Also if we are a wildcard do not check if large subject space. + const wcMaxSizeToCheck = 64 * 1024 + if i == bi && i < len(fs.blks)-1 && (!wc || fs.psim.Size() < wcMaxSizeToCheck) { + nbi, err := fs.checkSkipFirstBlock(filter, wc, bi) // Nothing available. - if nbi < 0 || lbi <= bi { + if err == ErrStoreEOF { return nil, fs.state.LastSeq, ErrStoreEOF } // See if we can jump ahead here. @@ -6529,9 +6531,7 @@ func (fs *fileStore) State() StreamState { // Can not be guaranteed to be sorted. if len(state.Deleted) > 0 { - sort.Slice(state.Deleted, func(i, j int) bool { - return state.Deleted[i] < state.Deleted[j] - }) + slices.Sort(state.Deleted) state.NumDeleted = len(state.Deleted) } return state @@ -8556,9 +8556,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt if err != nil { return nil, err } - <-dios - err = os.WriteFile(o.ifn, state, defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(o.ifn, state, defaultFilePerms) if err != nil { if didCreate { os.RemoveAll(odir) @@ -9032,9 +9030,7 @@ func (o *consumerFileStore) writeState(buf []byte) error { o.mu.Unlock() // Lock not held here but we do limit number of outstanding calls that could block OS threads. - <-dios - err := os.WriteFile(ifn, buf, defaultFilePerms) - dios <- struct{}{} + err := o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms) o.mu.Lock() if err != nil { @@ -9073,9 +9069,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } - <-dios - err = os.WriteFile(keyFile, encrypted, defaultFilePerms) - dios <- struct{}{} + err = cfs.fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms) if err != nil { return err } @@ -9096,9 +9090,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { b = cfs.aek.Seal(nonce, nonce, b, nil) } - <-dios - err = os.WriteFile(meta, b, defaultFilePerms) - dios <- struct{}{} + err = cfs.fs.writeFileWithOptionalSync(meta, b, defaultFilePerms) if err != nil { return err } @@ -9107,9 +9099,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { checksum := hex.EncodeToString(cfs.hh.Sum(nil)) sum := filepath.Join(cfs.odir, JetStreamMetaFileSum) - <-dios - err = os.WriteFile(sum, []byte(checksum), defaultFilePerms) - dios <- struct{}{} + err = cfs.fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms) if err != nil { return err } @@ -9404,9 +9394,7 @@ func (o *consumerFileStore) Stop() error { if len(buf) > 0 { o.waitOnFlusher() - <-dios - err = os.WriteFile(ifn, buf, defaultFilePerms) - dios <- struct{}{} + err = o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms) } return err } @@ -9640,3 +9628,26 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) { return output, reader.Close() } + +// writeFileWithOptionalSync is equivalent to os.WriteFile() but optionally +// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is +// handled automatically by this function, so don't wrap calls to it in dios. +func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error { + <-dios + defer func() { + dios <- struct{}{} + }() + flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC + if fs.fcfg.SyncAlways { + flags |= os.O_SYNC + } + f, err := os.OpenFile(name, flags, perm) + if err != nil { + return err + } + if _, err = f.Write(data); err != nil { + _ = f.Close() + return err + } + return f.Close() +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/gateway.go b/vendor/github.com/nats-io/nats-server/v2/server/gateway.go index 6c46eea889..82df196e2f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/gateway.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/gateway.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "cmp" "crypto/sha256" "crypto/tls" "encoding/json" @@ -22,7 +23,7 @@ import ( "math/rand" "net" "net/url" - "sort" + "slices" "strconv" "sync" "sync/atomic" @@ -1730,9 +1731,7 @@ func (s *Server) getOutboundGatewayConnections(a *[]*client) { // Gateway write lock is held on entry func (g *srvGateway) orderOutboundConnectionsLocked() { // Order the gateways by lowest RTT - sort.Slice(g.outo, func(i, j int) bool { - return g.outo[i].getRTTValue() < g.outo[j].getRTTValue() - }) + slices.SortFunc(g.outo, func(i, j *client) int { return cmp.Compare(i.getRTTValue(), j.getRTTValue()) }) } // Orders the array of outbound connections. @@ -2131,7 +2130,7 @@ func (c *client) processGatewayRSub(arg []byte) error { // for queue subscriptions. // -func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { +func (c *client) gatewayInterest(acc string, subj []byte) (bool, *SublistResult) { ei, accountInMap := c.gw.outsim.Load(acc) // If there is an entry for this account and ei is nil, // it means that the remote is not interested at all in @@ -2152,14 +2151,14 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { // but until e.ni is nil, use it to know if we // should suppress interest or not. if !c.gw.interestOnlyMode && e.ni != nil { - if _, inMap := e.ni[subj]; !inMap { + if _, inMap := e.ni[string(subj)]; !inMap { psi = true } } // If we are in modeInterestOnly (e.ni will be nil) // or if we have queue subs, we also need to check sl.Match. if e.ni == nil || e.qsubs > 0 { - r = e.sl.Match(subj) + r = e.sl.MatchBytes(subj) if len(r.psubs) > 0 { psi = true } @@ -2482,7 +2481,7 @@ func (g *srvGateway) shouldMapReplyForGatewaySend(acc *Account, reply []byte) bo } sl := sli.(*Sublist) if sl.Count() > 0 { - if r := sl.Match(string(reply)); len(r.psubs)+len(r.qsubs) > 0 { + if sl.HasInterest(string(reply)) { return true } } @@ -2568,7 +2567,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr } } else { // Plain sub interest and queue sub results for this account/subject - psi, qr := gwc.gatewayInterest(accName, string(subject)) + psi, qr := gwc.gatewayInterest(accName, subject) if !psi && qr == nil { continue } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go index c675bd1d1c..479babf81c 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "cmp" "encoding/json" "errors" "fmt" @@ -22,7 +23,7 @@ import ( "os" "path/filepath" "runtime" - "sort" + "slices" "strconv" "strings" "sync/atomic" @@ -824,10 +825,10 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // Copy the state. Note the JSAPI only uses the hdr index to piece apart the // header from the msg body. No other references are needed. // Check pending and warn if getting backed up. - const warnThresh = 32 + const warnThresh = 128 pending := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) - if pending > warnThresh { - s.RateLimitWarnf("JetStream request queue has high pending count: %d", pending) + if pending >= warnThresh { + s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending) } } @@ -1203,8 +1204,8 @@ func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, _ *Account } ts := acc.templates() - sort.Slice(ts, func(i, j int) bool { - return strings.Compare(ts[i].StreamTemplateConfig.Name, ts[j].StreamTemplateConfig.Name) < 0 + slices.SortFunc(ts, func(i, j *streamTemplate) int { + return cmp.Compare(i.StreamTemplateConfig.Name, j.StreamTemplateConfig.Name) }) tcnt := len(ts) @@ -1624,7 +1625,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, } js.mu.RUnlock() if len(resp.Streams) > 1 { - sort.Slice(resp.Streams, func(i, j int) bool { return strings.Compare(resp.Streams[i], resp.Streams[j]) < 0 }) + slices.Sort(resp.Streams) } numStreams = len(resp.Streams) if offset > numStreams { @@ -1640,9 +1641,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, msets := acc.filteredStreams(filter) // Since we page results order matters. if len(msets) > 1 { - sort.Slice(msets, func(i, j int) bool { - return strings.Compare(msets[i].cfg.Name, msets[j].cfg.Name) < 0 - }) + slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) }) } numStreams = len(msets) @@ -1739,9 +1738,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s msets = acc.filteredStreams(filter) } - sort.Slice(msets, func(i, j int) bool { - return strings.Compare(msets[i].cfg.Name, msets[j].cfg.Name) < 0 - }) + slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) }) scnt := len(msets) if offset > scnt { @@ -1944,7 +1941,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s subjs = append(subjs, subj) } // Sort it - sort.Strings(subjs) + slices.Sort(subjs) if offset > len(subjs) { offset = len(subjs) @@ -4074,7 +4071,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account resp.Consumers = append(resp.Consumers, consumer) } if len(resp.Consumers) > 1 { - sort.Slice(resp.Consumers, func(i, j int) bool { return strings.Compare(resp.Consumers[i], resp.Consumers[j]) < 0 }) + slices.Sort(resp.Consumers) } numConsumers = len(resp.Consumers) if offset > numConsumers { @@ -4095,9 +4092,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account } obs := mset.getPublicConsumers() - sort.Slice(obs, func(i, j int) bool { - return strings.Compare(obs[i].name, obs[j].name) < 0 - }) + slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) }) numConsumers = len(obs) if offset > numConsumers { @@ -4190,9 +4185,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, } obs := mset.getPublicConsumers() - sort.Slice(obs, func(i, j int) bool { - return strings.Compare(obs[i].name, obs[j].name) < 0 - }) + slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) }) ocnt := len(obs) if offset > ocnt { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 78b8f9e7d5..18a67c32e9 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "cmp" crand "crypto/rand" "encoding/binary" "encoding/json" @@ -25,7 +26,7 @@ import ( "os" "path/filepath" "reflect" - "sort" + "slices" "strconv" "strings" "sync/atomic" @@ -1141,17 +1142,24 @@ type recoveryUpdates struct { // Streams and consumers are recovered from disk, and the meta layer's mappings // should clean them up, but under crash scenarios there could be orphans. func (js *jetStream) checkForOrphans() { - consumerName := func(o *consumer) string { - o.mu.RLock() - defer o.mu.RUnlock() - return o.name - } - // Can not hold jetstream lock while trying to delete streams or consumers. js.mu.Lock() s, cc := js.srv, js.cluster s.Debugf("JetStream cluster checking for orphans") + // We only want to cleanup any orphans if we know we are current with the meta-leader. + meta := cc.meta + if meta == nil || meta.GroupLeader() == _EMPTY_ { + js.mu.Unlock() + s.Debugf("JetStream cluster skipping check for orphans, no meta-leader") + return + } + if !meta.Healthy() { + js.mu.Unlock() + s.Debugf("JetStream cluster skipping check for orphans, not current with the meta-leader") + return + } + var streams []*stream var consumers []*consumer @@ -1164,8 +1172,7 @@ func (js *jetStream) checkForOrphans() { } else { // This one is good, check consumers now. for _, o := range mset.getConsumers() { - consumer := consumerName(o) - if sa.consumers[consumer] == nil { + if sa.consumers[o.String()] == nil { consumers = append(consumers, o) } } @@ -1333,6 +1340,11 @@ func (js *jetStream) monitorCluster() { updateConsumers: make(map[string]*consumerAssignment), } + // Make sure to cancel any pending checkForOrphans calls if the + // monitor goroutine exits. + var oc *time.Timer + defer stopAndClearTimer(&oc) + for { select { case <-s.quitCh: @@ -1369,7 +1381,7 @@ func (js *jetStream) monitorCluster() { // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") - js.checkForOrphans() + oc = time.AfterFunc(30*time.Second, js.checkForOrphans) // Do a health check here as well. go checkHealth() continue @@ -2249,7 +2261,12 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Make sure to stop the raft group on exit to prevent accidental memory bloat. // This should be below the checkInMonitor call though to avoid stopping it out // from underneath the one that is running since it will be the same raft node. - defer n.Stop() + defer func() { + // We might be closing during shutdown, don't pre-emptively stop here since we'll still want to install snapshots. + if !mset.closed.Load() { + n.Stop() + } + }() qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() @@ -2408,6 +2425,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps return case <-aq.ch: var ne, nb uint64 + // If we bump clfs we will want to write out snapshot if within our time window. + pclfs := mset.getCLFS() + ces := aq.pop() for _, ce := range ces { // No special processing needed for when we are caught up on restart. @@ -2424,6 +2444,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } continue } + // Apply our entries. if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { // Update our applied. @@ -2455,7 +2476,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Check about snapshotting // If we have at least min entries to compact, go ahead and try to snapshot/compact. - if ne >= compactNumMin || nb > compactSizeMin { + if ne >= compactNumMin || nb > compactSizeMin || mset.getCLFS() > pclfs { + // We want to make sure we do not short circuit if transistioning from no clfs. + if pclfs == 0 { + // This is always false by default. + lastState.firstNeedsUpdate = true + lastSnapTime = time.Time{} + } doSnapshot() } @@ -3114,13 +3141,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } } - if !isRecovering && !mset.IsLeader() { + if isRecovering || !mset.IsLeader() { if err := mset.processSnapshot(ss); err != nil { return err } - } else if isRecovering { - // On recovery, reset CLFS/FAILED. - mset.setCLFS(ss.Failed) } } else if e.Type == EntryRemovePeer { js.mu.RLock() @@ -5894,15 +5918,15 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo return nil, &err } // Sort based on available from most to least, breaking ties by number of total streams assigned to the peer. - sort.Slice(nodes, func(i, j int) bool { - if nodes[i].avail == nodes[j].avail { - return nodes[i].ns < nodes[j].ns + slices.SortFunc(nodes, func(i, j wn) int { + if i.avail == j.avail { + return cmp.Compare(i.ns, j.ns) } - return nodes[i].avail > nodes[j].avail + return -cmp.Compare(i.avail, j.avail) // reverse }) // If we are placing a replicated stream, let's sort based on HAAssets, as that is more important to balance. if cfg.Replicas > 1 { - sort.SliceStable(nodes, func(i, j int) bool { return nodes[i].ha < nodes[j].ha }) + slices.SortStableFunc(nodes, func(i, j wn) int { return cmp.Compare(i.ha, j.ha) }) } var results []string @@ -6662,9 +6686,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt // Needs to be sorted for offsets etc. if len(streams) > 1 { - sort.Slice(streams, func(i, j int) bool { - return strings.Compare(streams[i].Config.Name, streams[j].Config.Name) < 0 - }) + slices.SortFunc(streams, func(i, j *streamAssignment) int { return cmp.Compare(i.Config.Name, j.Config.Name) }) } scnt := len(streams) @@ -6777,9 +6799,7 @@ LOOP: // Needs to be sorted as well. if len(resp.Streams) > 1 { - sort.Slice(resp.Streams, func(i, j int) bool { - return strings.Compare(resp.Streams[i].Config.Name, resp.Streams[j].Config.Name) < 0 - }) + slices.SortFunc(resp.Streams, func(i, j *StreamInfo) int { return cmp.Compare(i.Config.Name, j.Config.Name) }) } resp.Total = scnt @@ -6812,9 +6832,7 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of } // Needs to be sorted. if len(consumers) > 1 { - sort.Slice(consumers, func(i, j int) bool { - return strings.Compare(consumers[i].Name, consumers[j].Name) < 0 - }) + slices.SortFunc(consumers, func(i, j *consumerAssignment) int { return cmp.Compare(i.Config.Name, j.Config.Name) }) } ocnt := len(consumers) @@ -6924,9 +6942,7 @@ LOOP: // Needs to be sorted as well. if len(resp.Consumers) > 1 { - sort.Slice(resp.Consumers, func(i, j int) bool { - return strings.Compare(resp.Consumers[i].Name, resp.Consumers[j].Name) < 0 - }) + slices.SortFunc(resp.Consumers, func(i, j *ConsumerInfo) int { return cmp.Compare(i.Name, j.Name) }) } resp.Total = ocnt @@ -7799,13 +7815,16 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ } // Expected last sequence per subject. // We can check for last sequence per subject but only if the expected seq <= lseq. - if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 && seq <= lseq { + if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq <= lseq { var smv StoreMsg var fseq uint64 sm, err := store.LoadLastMsg(subject, &smv) if sm != nil { fseq = sm.seq } + if err == ErrStoreMsgNotFound && seq == 0 { + fseq, err = 0, nil + } if err != nil || fseq != seq { if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} @@ -8537,9 +8556,7 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { } // Order the result based on the name so that we get something consistent // when doing repeated stream info in the CLI, etc... - sort.Slice(ci.Replicas, func(i, j int) bool { - return ci.Replicas[i].Name < ci.Replicas[j].Name - }) + slices.SortFunc(ci.Replicas, func(i, j *PeerInfo) int { return cmp.Compare(i.Name, j.Name) }) return ci } @@ -8599,9 +8616,8 @@ func (js *jetStream) streamAlternates(ci *ClientInfo, stream string) []StreamAlt } // Sort based on our weights that originate from the request itself. - sort.Slice(alts, func(i, j int) bool { - return weights[alts[i].Cluster] > weights[alts[j].Cluster] - }) + // reverse sort + slices.SortFunc(alts, func(i, j StreamAlternate) int { return -cmp.Compare(weights[i.Cluster], weights[j.Cluster]) }) return alts } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go index 3c20cbdf43..e40cfcab89 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go @@ -34,6 +34,7 @@ import ( "sync" "sync/atomic" "time" + "unicode" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" @@ -1764,6 +1765,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return err } + // Reject a cluster that contains spaces or line breaks. + if proto.Cluster != _EMPTY_ && strings.ContainsFunc(proto.Cluster, unicode.IsSpace) { + c.sendErrAndErr(ErrClusterNameHasSpaces.Error()) + c.closeConnection(ProtocolViolation) + return ErrClusterNameHasSpaces + } + // Check for cluster name collisions. if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn { c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error()) @@ -2669,9 +2677,8 @@ func (c *client) processInboundLeafMsg(msg []byte) { // Go back to the sublist data structure. if !ok { r = c.acc.sl.Match(subject) - c.in.results[subject] = r // Prune the results cache. Keeps us from unbounded growth. Random delete. - if len(c.in.results) > maxResultCacheSize { + if len(c.in.results) >= maxResultCacheSize { n := 0 for subj := range c.in.results { delete(c.in.results, subj) @@ -2680,6 +2687,8 @@ func (c *client) processInboundLeafMsg(msg []byte) { } } } + // Then add the new cache entry. + c.in.results[subject] = r } // Collect queue names if needed. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/log.go b/vendor/github.com/nats-io/nats-server/v2/server/log.go index e1b9078a5e..6822265823 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/log.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/log.go @@ -219,6 +219,14 @@ func (s *Server) Warnf(format string, v ...any) { }, format, v...) } +func (s *Server) rateLimitFormatWarnf(format string, v ...any) { + if _, loaded := s.rateLimitLogging.LoadOrStore(format, time.Now()); loaded { + return + } + statement := fmt.Sprintf(format, v...) + s.Warnf("%s", statement) +} + func (s *Server) RateLimitWarnf(format string, v ...any) { statement := fmt.Sprintf(format, v...) if _, loaded := s.rateLimitLogging.LoadOrStore(statement, time.Now()); loaded { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go index b72ee09d57..46e262e515 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "cmp" "crypto/sha256" "crypto/tls" "crypto/x509" @@ -29,6 +30,7 @@ import ( "path/filepath" "runtime" "runtime/pprof" + "slices" "sort" "strconv" "strings" @@ -2724,15 +2726,16 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { // JSzOptions are options passed to Jsz type JSzOptions struct { - Account string `json:"account,omitempty"` - Accounts bool `json:"accounts,omitempty"` - Streams bool `json:"streams,omitempty"` - Consumer bool `json:"consumer,omitempty"` - Config bool `json:"config,omitempty"` - LeaderOnly bool `json:"leader_only,omitempty"` - Offset int `json:"offset,omitempty"` - Limit int `json:"limit,omitempty"` - RaftGroups bool `json:"raft,omitempty"` + Account string `json:"account,omitempty"` + Accounts bool `json:"accounts,omitempty"` + Streams bool `json:"streams,omitempty"` + Consumer bool `json:"consumer,omitempty"` + Config bool `json:"config,omitempty"` + LeaderOnly bool `json:"leader_only,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` + RaftGroups bool `json:"raft,omitempty"` + StreamLeaderOnly bool `json:"stream_leader_only,omitempty"` } // HealthzOptions are options passed to Healthz @@ -2749,8 +2752,9 @@ type HealthzOptions struct { // ProfilezOptions are options passed to Profilez type ProfilezOptions struct { - Name string `json:"name"` - Debug int `json:"debug"` + Name string `json:"name"` + Debug int `json:"debug"` + Duration time.Duration `json:"duration,omitempty"` } // StreamDetail shows information about the stream state and its consumers. @@ -2806,7 +2810,7 @@ type JSInfo struct { AccountDetails []*AccountDetail `json:"account_details,omitempty"` } -func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft bool) *AccountDetail { +func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail { jsa.mu.RLock() acc := jsa.account name := acc.GetName() @@ -2852,6 +2856,10 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, c := stream.config() cfg = &c } + // Skip if we are only looking for stream leaders. + if optStreamLeader && ci != nil && ci.Leader != s.Name() { + continue + } sdet := StreamDetail{ Name: stream.name(), Created: stream.createdTime(), @@ -2907,7 +2915,7 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { if !ok { return nil, fmt.Errorf("account %q not jetstream enabled", acc) } - return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups), nil + return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly), nil } // helper to get cluster info from node via dummy group @@ -3012,9 +3020,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { accounts = []*jsAccount{accounts[filterIdx]} } else if opts.Accounts { if opts.Offset != 0 { - sort.Slice(accounts, func(i, j int) bool { - return strings.Compare(accounts[i].acc().Name, accounts[j].acc().Name) < 0 - }) + slices.SortFunc(accounts, func(i, j *jsAccount) int { return cmp.Compare(i.acc().Name, j.acc().Name) }) if opts.Offset > len(accounts) { accounts = []*jsAccount{} } else { @@ -3034,7 +3040,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { } // if wanted, obtain accounts/streams/consumer for _, jsa := range accounts { - detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups) + detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly) jsi.AccountDetails = append(jsi.AccountDetails, detail) } return jsi, nil @@ -3078,16 +3084,22 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { return } + sleader, err := decodeBool(w, r, "stream-leader-only") + if err != nil { + return + } + l, err := s.Jsz(&JSzOptions{ - r.URL.Query().Get("acc"), - accounts, - streams, - consumers, - config, - leader, - offset, - limit, - rgroups, + Account: r.URL.Query().Get("acc"), + Accounts: accounts, + Streams: streams, + Consumer: consumers, + Config: config, + LeaderOnly: leader, + Offset: offset, + Limit: limit, + RaftGroups: rgroups, + StreamLeaderOnly: sleader, }) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -3724,21 +3736,36 @@ type ProfilezStatus struct { } func (s *Server) profilez(opts *ProfilezOptions) *ProfilezStatus { - if opts.Name == _EMPTY_ { + var buffer bytes.Buffer + switch opts.Name { + case _EMPTY_: return &ProfilezStatus{ Error: "Profile name not specified", } - } - profile := pprof.Lookup(opts.Name) - if profile == nil { - return &ProfilezStatus{ - Error: fmt.Sprintf("Profile %q not found", opts.Name), + case "cpu": + if opts.Duration <= 0 || opts.Duration > 15*time.Second { + return &ProfilezStatus{ + Error: fmt.Sprintf("Duration %s should be between 0s and 15s", opts.Duration), + } } - } - var buffer bytes.Buffer - if err := profile.WriteTo(&buffer, opts.Debug); err != nil { - return &ProfilezStatus{ - Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err), + if err := pprof.StartCPUProfile(&buffer); err != nil { + return &ProfilezStatus{ + Error: fmt.Sprintf("Failed to start CPU profile: %s", err), + } + } + time.Sleep(opts.Duration) + pprof.StopCPUProfile() + default: + profile := pprof.Lookup(opts.Name) + if profile == nil { + return &ProfilezStatus{ + Error: fmt.Sprintf("Profile %q not found", opts.Name), + } + } + if err := profile.WriteTo(&buffer, opts.Debug); err != nil { + return &ProfilezStatus{ + Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err), + } } } return &ProfilezStatus{ @@ -3787,8 +3814,14 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { gfilter := r.URL.Query().Get("group") afilter := r.URL.Query().Get("acc") - if afilter == "" { - afilter = s.SystemAccount().Name + if afilter == _EMPTY_ { + if sys := s.SystemAccount(); sys != nil { + afilter = sys.Name + } else { + w.WriteHeader(404) + w.Write([]byte("System account not found, the server may be shutting down")) + return + } } groups := map[string]RaftNode{} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go b/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go index 33a0010992..1c6a98a2db 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "cmp" "crypto/tls" "encoding/binary" "encoding/json" @@ -23,7 +24,7 @@ import ( "io" "net" "net/http" - "sort" + "slices" "strconv" "strings" "sync" @@ -4276,9 +4277,7 @@ func (s *Server) mqttCheckPubRetainedPerms() { }) } asm.mu.RUnlock() - sort.Slice(rms, func(i, j int) bool { - return rms[i].rmsg.sseq < rms[j].rmsg.sseq - }) + slices.SortFunc(rms, func(i, j retainedMsg) int { return cmp.Compare(i.rmsg.sseq, j.rmsg.sseq) }) perms := map[string]*perm{} deletes := map[string]uint64{} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_windows.go b/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_windows.go index 5ee9645f3f..04e3ae8bb9 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_windows.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_windows.go @@ -25,10 +25,12 @@ import ( "syscall" "time" "unsafe" + + "golang.org/x/sys/windows" ) var ( - pdh = syscall.NewLazyDLL("pdh.dll") + pdh = windows.NewLazySystemDLL("pdh.dll") winPdhOpenQuery = pdh.NewProc("PdhOpenQuery") winPdhAddCounter = pdh.NewProc("PdhAddCounterW") winPdhCollectQueryData = pdh.NewProc("PdhCollectQueryData") @@ -36,6 +38,20 @@ var ( winPdhGetFormattedCounterArray = pdh.NewProc("PdhGetFormattedCounterArrayW") ) +func init() { + if err := pdh.Load(); err != nil { + panic(err) + } + for _, p := range []*windows.LazyProc{ + winPdhOpenQuery, winPdhAddCounter, winPdhCollectQueryData, + winPdhGetFormattedCounterValue, winPdhGetFormattedCounterArray, + } { + if err := p.Find(); err != nil { + panic(err) + } + } +} + // global performance counter query handle and counters var ( pcHandle PDH_HQUERY diff --git a/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/vendor/github.com/nats-io/nats-server/v2/server/raft.go index 347d788eb3..165cbbe91f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -499,7 +499,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe // If we fail to do this for some reason then this is fatal — we cannot // continue setting up or the Raft node may be partially/totally isolated. if err := n.createInternalSubs(); err != nil { - n.shutdown(true) + n.shutdown(false) return nil, err } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/reload.go b/vendor/github.com/nats-io/nats-server/v2/server/reload.go index d15525d5d6..15bbae1e38 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/reload.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/reload.go @@ -14,12 +14,13 @@ package server import ( + "cmp" "crypto/tls" "errors" "fmt" "net/url" "reflect" - "sort" + "slices" "strings" "sync/atomic" "time" @@ -1131,38 +1132,24 @@ func (s *Server) reloadOptions(curOpts, newOpts *Options) error { func imposeOrder(value any) error { switch value := value.(type) { case []*Account: - sort.Slice(value, func(i, j int) bool { - return value[i].Name < value[j].Name - }) + slices.SortFunc(value, func(i, j *Account) int { return cmp.Compare(i.Name, j.Name) }) for _, a := range value { - sort.Slice(a.imports.streams, func(i, j int) bool { - return a.imports.streams[i].acc.Name < a.imports.streams[j].acc.Name - }) + slices.SortFunc(a.imports.streams, func(i, j *streamImport) int { return cmp.Compare(i.acc.Name, j.acc.Name) }) } case []*User: - sort.Slice(value, func(i, j int) bool { - return value[i].Username < value[j].Username - }) + slices.SortFunc(value, func(i, j *User) int { return cmp.Compare(i.Username, j.Username) }) case []*NkeyUser: - sort.Slice(value, func(i, j int) bool { - return value[i].Nkey < value[j].Nkey - }) + slices.SortFunc(value, func(i, j *NkeyUser) int { return cmp.Compare(i.Nkey, j.Nkey) }) case []*url.URL: - sort.Slice(value, func(i, j int) bool { - return value[i].String() < value[j].String() - }) + slices.SortFunc(value, func(i, j *url.URL) int { return cmp.Compare(i.String(), j.String()) }) case []string: - sort.Strings(value) + slices.Sort(value) case []*jwt.OperatorClaims: - sort.Slice(value, func(i, j int) bool { - return value[i].Issuer < value[j].Issuer - }) + slices.SortFunc(value, func(i, j *jwt.OperatorClaims) int { return cmp.Compare(i.Issuer, j.Issuer) }) case GatewayOpts: - sort.Slice(value.Gateways, func(i, j int) bool { - return value.Gateways[i].Name < value.Gateways[j].Name - }) + slices.SortFunc(value.Gateways, func(i, j *RemoteGatewayOpts) int { return cmp.Compare(i.Name, j.Name) }) case WebsocketOpts: - sort.Strings(value.AllowedOrigins) + slices.Sort(value.AllowedOrigins) case string, bool, uint8, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, *URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList, *OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig: diff --git a/vendor/github.com/nats-io/nats-server/v2/server/server.go b/vendor/github.com/nats-io/nats-server/v2/server/server.go index cc3130ebe5..4907bd5b85 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/server.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/server.go @@ -27,8 +27,10 @@ import ( "math/rand" "net" "net/http" + "net/url" "regexp" "runtime/pprof" + "unicode" // Allow dynamic profiling. _ "net/http/pprof" @@ -953,11 +955,13 @@ func (s *Server) serverName() string { func (s *Server) ClientURL() string { // FIXME(dlc) - should we add in user and pass if defined single? opts := s.getOpts() - scheme := "nats://" + var u url.URL + u.Scheme = "nats" if opts.TLSConfig != nil { - scheme = "tls://" + u.Scheme = "tls" } - return fmt.Sprintf("%s%s:%d", scheme, opts.Host, opts.Port) + u.Host = net.JoinHostPort(opts.Host, fmt.Sprintf("%d", opts.Port)) + return u.String() } func validateCluster(o *Options) error { @@ -1698,12 +1702,14 @@ func (s *Server) setSystemAccount(acc *Account) error { replies: make(map[string]msgHandler), sendq: newIPQueue[*pubMsg](s, "System sendQ"), recvq: newIPQueue[*inSysMsg](s, "System recvQ"), + recvqp: newIPQueue[*inSysMsg](s, "System recvQ Pings"), resetCh: make(chan struct{}), sq: s.newSendQ(), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval, } + recvq, recvqp := s.sys.recvq, s.sys.recvqp s.sys.wg.Add(1) s.mu.Unlock() @@ -1717,7 +1723,9 @@ func (s *Server) setSystemAccount(acc *Account) error { go s.internalSendLoop(&s.sys.wg) // Start the internal loop for inbound messages. - go s.internalReceiveLoop() + go s.internalReceiveLoop(recvq) + // Start the internal loop for inbound STATSZ/Ping messages. + go s.internalReceiveLoop(recvqp) // Start up our general subscriptions s.initEventTracking() @@ -2348,6 +2356,9 @@ func (s *Server) Start() { // Solicit remote servers for leaf node connections. if len(opts.LeafNode.Remotes) > 0 { s.solicitLeafNodeRemotes(opts.LeafNode.Remotes) + if opts.Cluster.Name == opts.ServerName && strings.ContainsFunc(opts.Cluster.Name, unicode.IsSpace) { + s.Warnf("Server name has spaces and used as the cluster name, leaf remotes may not connect properly") + } } // TODO (ik): I wanted to refactor this by starting the client @@ -2772,6 +2783,7 @@ func (s *Server) StartProfiler() { Addr: hp, Handler: http.DefaultServeMux, MaxHeaderBytes: 1 << 20, + ReadTimeout: time.Second * 5, } s.profiler = l s.profilingServer = srv @@ -2971,10 +2983,11 @@ func (s *Server) startMonitoring(secure bool) error { // to return empty response or unable to display page if the // server needs more time to build the response. srv := &http.Server{ - Addr: hp, - Handler: mux, - MaxHeaderBytes: 1 << 20, - ErrorLog: log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0), + Addr: hp, + Handler: mux, + MaxHeaderBytes: 1 << 20, + ErrorLog: log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0), + ReadHeaderTimeout: time.Second * 5, } s.mu.Lock() s.http = httpListener @@ -3294,7 +3307,7 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client { // This will save off a closed client in a ring buffer such that // /connz can inspect. Useful for debugging, etc. -func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) { +func (s *Server) saveClosedClient(c *client, nc net.Conn, subs map[string]*subscription, reason ClosedState) { now := time.Now() s.accountDisconnectEvent(c, now, reason.String()) @@ -3303,17 +3316,18 @@ func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) { cc := &closedClient{} cc.fill(c, nc, now, false) + // Note that cc.fill is using len(c.subs), which may have been set to nil by now, + // so replace cc.NumSubs with len(subs). + cc.NumSubs = uint32(len(subs)) cc.Stop = &now cc.Reason = reason.String() // Do subs, do not place by default in main ConnInfo - if len(c.subs) > 0 { - cc.subs = make([]SubDetail, 0, len(c.subs)) - for _, sub := range c.subs { + if len(subs) > 0 { + cc.subs = make([]SubDetail, 0, len(subs)) + for _, sub := range subs { cc.subs = append(cc.subs, newSubDetail(sub)) } - // Now set this to nil to allow connection to be released. - c.subs = nil } // Hold user as well. cc.user = c.getRawAuthUser() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/store.go b/vendor/github.com/nats-io/nats-server/v2/server/store.go index 235452a30f..abbc06f899 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/store.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/store.go @@ -765,3 +765,11 @@ func stringToBytes(s string) []byte { b := unsafe.Slice(p, len(s)) return b } + +// Forces a copy of a string, for use in the case that you might have been passed a value when bytesToString was used, +// but now you need a separate copy of it to store for longer-term use. +func copyString(s string) string { + b := make([]byte, len(s)) + copy(b, s) + return bytesToString(b) +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index a09afdbf32..fefbc1f11d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -236,6 +236,7 @@ type stream struct { consumers map[string]*consumer // The consumers for this stream. numFilter int // The number of filtered consumers. cfg StreamConfig // The stream's config. + cfgMu sync.RWMutex // Config mutex used to solve some races with consumer code created time.Time // Time the stream was created. stype StorageType // The storage type. tier string // The tier is the number of replicas for the stream (e.g. "R1" or "R3"). @@ -1018,6 +1019,9 @@ func (mset *stream) lastSeqAndCLFS() (uint64, uint64) { } func (mset *stream) getCLFS() uint64 { + if mset == nil { + return 0 + } mset.clMu.Lock() defer mset.clMu.Unlock() return mset.clfs @@ -1950,7 +1954,14 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) } // Now update config and store's version of our config. + // Although we are under the stream write lock, we will also assign the new + // configuration under mset.cfgMu lock. This is so that in places where + // mset.mu cannot be acquired (like many cases in consumer.go where code + // is under the consumer's lock), and the stream's configuration needs to + // be inspected, one can use mset.cfgMu's read lock to do that safely. + mset.cfgMu.Lock() mset.cfg = *cfg + mset.cfgMu.Unlock() // If we're changing retention and haven't errored because of consumer // replicas by now, whip through and update the consumer retention. @@ -2001,6 +2012,15 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) return nil } +// Small helper to return the Name field from mset.cfg, protected by +// the mset.cfgMu mutex. This is simply because we have several places +// in consumer.go where we need it. +func (mset *stream) getCfgName() string { + mset.cfgMu.RLock() + defer mset.cfgMu.RUnlock() + return mset.cfg.Name +} + // Purge will remove all messages from the stream and underlying store based on the request. func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) { mset.mu.RLock() @@ -4369,7 +4389,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Expected last sequence per subject. - // If we are clustered we have prechecked seq > 0. if seq, exists := getExpectedLastSeqPerSubject(hdr); exists { // TODO(dlc) - We could make a new store func that does this all in one. var smv StoreMsg @@ -4378,8 +4397,17 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if sm != nil { fseq = sm.seq } - if err == ErrStoreMsgNotFound && seq == 0 { - fseq, err = 0, nil + if err == ErrStoreMsgNotFound { + if seq == 0 { + fseq, err = 0, nil + } else if mset.isClustered() { + // Do not bump clfs in case message was not found and could have been deleted. + var ss StreamState + store.FastState(&ss) + if seq <= ss.LastSeq { + fseq, err = seq, nil + } + } } if err != nil || fseq != seq { mset.mu.Unlock() @@ -4521,7 +4549,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, mset.lmsgId = msgId // If we have a msgId make sure to save. if msgId != _EMPTY_ { - mset.storeMsgIdLocked(&ddentry{msgId, seq, ts}) + mset.storeMsgIdLocked(&ddentry{msgId, mset.lseq, ts}) } if canRespond { response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...) @@ -4573,6 +4601,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, response, _ = json.Marshal(resp) mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) } + mset.mu.Unlock() + return err } } @@ -5069,6 +5099,10 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Kick monitor and collect consumers first. mset.mu.Lock() + + // Mark closed. + mset.closed.Store(true) + // Signal to the monitor loop. // Can't use qch here. if mset.mqch != nil { @@ -5129,9 +5163,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.sendDeleteAdvisoryLocked() } - // Mark closed. - mset.closed.Store(true) - // Quit channel, do this after sending the delete advisory if mset.qch != nil { close(mset.qch) @@ -5272,92 +5303,9 @@ func (mset *stream) checkInterestState() { return } - var zeroAcks []*consumer - var lowAckFloor uint64 = math.MaxUint64 - for _, o := range mset.getConsumers() { o.checkStateForInterestStream() - - o.mu.Lock() - if o.isLeader() { - // We need to account for consumers with ack floor of zero. - // We will collect them and see if we need to check pending below. - if o.asflr == 0 { - zeroAcks = append(zeroAcks, o) - } else if o.asflr < lowAckFloor { - lowAckFloor = o.asflr - } - } else { - // We are a follower so only have the store state, so read that in. - state, err := o.store.State() - if err != nil { - // On error we will not have enough information to process correctly so bail. - o.mu.Unlock() - return - } - // We need to account for consumers with ack floor of zero. - if state.AckFloor.Stream == 0 { - zeroAcks = append(zeroAcks, o) - } else if state.AckFloor.Stream < lowAckFloor { - lowAckFloor = state.AckFloor.Stream - } - // We are a follower here but if we detect a drift from when we were previous leader correct here. - if o.asflr > state.AckFloor.Stream || o.sseq > state.Delivered.Stream+1 { - o.applyState(state) - } - } - o.mu.Unlock() } - - // If nothing was set we can bail. - if lowAckFloor == math.MaxUint64 { - return - } - - // Capture our current state. - // ok to do so without lock. - var state StreamState - mset.store.FastState(&state) - - if lowAckFloor <= state.FirstSeq { - return - } - - // Do not want to hold stream lock if calculating numPending. - // Check if we had any zeroAcks, we will need to check them. - for _, o := range zeroAcks { - var np uint64 - o.mu.RLock() - if o.isLeader() { - np = uint64(o.numPending()) - } else { - np, _ = o.calculateNumPending() - } - o.mu.RUnlock() - // This means we have pending and can not remove anything at this time. - if np > 0 { - return - } - } - - mset.mu.Lock() - defer mset.mu.Unlock() - - // Check which purge we need to perform. - if lowAckFloor <= state.LastSeq || state.Msgs == 0 { - // Purge the stream to lowest ack floor + 1 - mset.store.PurgeEx(_EMPTY_, lowAckFloor+1, 0) - } else { - // Here we have a low ack floor higher then our last seq. - // So we will just do normal purge. - mset.store.Purge() - } - - // Make sure to reset our local lseq. - mset.store.FastState(&state) - mset.lseq = state.LastSeq - // Also make sure we clear any pending acks. - mset.clearAllPreAcksBelowFloor(state.FirstSeq) } func (mset *stream) isInterestRetention() bool { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/node48.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/node48.go index fe7ef54352..7099edd58b 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stree/node48.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/node48.go @@ -51,9 +51,9 @@ func (n *node48) isFull() bool { return n.size >= 48 } func (n *node48) grow() node { nn := newNode256(n.prefix) - for c := byte(0); c < 255; c++ { - if i := n.key[c]; i > 0 { - nn.addChild(c, n.child[i-1]) + for c := 0; c < len(n.key); c++ { + if i := n.key[byte(c)]; i > 0 { + nn.addChild(byte(c), n.child[i-1]) } } return nn @@ -69,9 +69,9 @@ func (n *node48) deleteChild(c byte) { last := byte(n.size - 1) if i < last { n.child[i] = n.child[last] - for c := byte(0); c <= 255; c++ { - if n.key[c] == last+1 { - n.key[c] = i + 1 + for ic := 0; ic < len(n.key); ic++ { + if n.key[byte(ic)] == last+1 { + n.key[byte(ic)] = i + 1 break } } @@ -87,9 +87,9 @@ func (n *node48) shrink() node { return nil } nn := newNode16(nil) - for c := byte(0); c < 255; c++ { - if i := n.key[c]; i > 0 { - nn.addChild(c, n.child[i-1]) + for c := 0; c < len(n.key); c++ { + if i := n.key[byte(c)]; i > 0 { + nn.addChild(byte(c), n.child[i-1]) } } return nn diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go index d0835bf5d1..d9167d94f7 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go @@ -15,7 +15,7 @@ package stree import ( "bytes" - "sort" + "slices" ) // SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects. @@ -382,7 +382,7 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T } } // Now sort. - sort.SliceStable(nodes, func(i, j int) bool { return bytes.Compare(nodes[i].path(), nodes[j].path()) < 0 }) + slices.SortStableFunc(nodes, func(a, b node) int { return bytes.Compare(a.path(), b.path()) }) // Now walk the nodes in order and call into next iter. for i := range nodes { if !t.iter(nodes[i], pre, cb) { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sublist.go b/vendor/github.com/nats-io/nats-server/v2/server/sublist.go index 0000ad9f9a..67eb88ae07 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sublist.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sublist.go @@ -527,7 +527,13 @@ var emptyResult = &SublistResult{} // Match will match all entries to the literal subject. // It will return a set of results for both normal and queue subscribers. func (s *Sublist) Match(subject string) *SublistResult { - return s.match(subject, true) + return s.match(subject, true, false) +} + +// MatchBytes will match all entries to the literal subject. +// It will return a set of results for both normal and queue subscribers. +func (s *Sublist) MatchBytes(subject []byte) *SublistResult { + return s.match(bytesToString(subject), true, true) } // HasInterest will return whether or not there is any interest in the subject. @@ -537,10 +543,10 @@ func (s *Sublist) HasInterest(subject string) bool { } func (s *Sublist) matchNoLock(subject string) *SublistResult { - return s.match(subject, false) + return s.match(subject, false, false) } -func (s *Sublist) match(subject string, doLock bool) *SublistResult { +func (s *Sublist) match(subject string, doLock bool, doCopyOnCache bool) *SublistResult { atomic.AddUint64(&s.matches, 1) // Check cache first. @@ -595,6 +601,9 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { result = emptyResult } if cacheEnabled { + if doCopyOnCache { + subject = copyString(subject) + } s.cache[subject] = result n = len(s.cache) } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_windows.go b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_windows.go index cc4c43dc95..f557c0b8ee 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_windows.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_windows.go @@ -17,10 +17,23 @@ package sysmem import ( - "syscall" "unsafe" + + "golang.org/x/sys/windows" ) +var winKernel32 = windows.NewLazySystemDLL("kernel32.dll") +var winGlobalMemoryStatusEx = winKernel32.NewProc("GlobalMemoryStatusEx") + +func init() { + if err := winKernel32.Load(); err != nil { + panic(err) + } + if err := winGlobalMemoryStatusEx.Find(); err != nil { + panic(err) + } +} + // https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/ns-sysinfoapi-memorystatusex type _memoryStatusEx struct { dwLength uint32 @@ -30,16 +43,8 @@ type _memoryStatusEx struct { } func Memory() int64 { - kernel32, err := syscall.LoadDLL("kernel32.dll") - if err != nil { - return 0 - } - globalMemoryStatusEx, err := kernel32.FindProc("GlobalMemoryStatusEx") - if err != nil { - return 0 - } msx := &_memoryStatusEx{dwLength: 64} - res, _, _ := globalMemoryStatusEx.Call(uintptr(unsafe.Pointer(msx))) + res, _, _ := winGlobalMemoryStatusEx.Call(uintptr(unsafe.Pointer(msx))) if res == 0 { return 0 } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/websocket.go b/vendor/github.com/nats-io/nats-server/v2/server/websocket.go index 1752942303..6fce09dd9f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/websocket.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/websocket.go @@ -667,7 +667,7 @@ func (c *client) wsHandleProtocolError(message string) error { buf := wsCreateCloseMessage(wsCloseStatusProtocolError, message) c.wsEnqueueControlMessage(wsCloseMessage, buf) nbPoolPut(buf) // wsEnqueueControlMessage has taken a copy. - return fmt.Errorf(message) + return errors.New(message) } // Create a close message with the given `status` and `body`. diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE index 6a66aea5ea..2a7cf70da6 100644 --- a/vendor/golang.org/x/time/LICENSE +++ b/vendor/golang.org/x/time/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. +Copyright 2009 The Go Authors. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are @@ -10,7 +10,7 @@ notice, this list of conditions and the following disclaimer. copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - * Neither the name of Google Inc. nor the names of its + * Neither the name of Google LLC nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. diff --git a/vendor/modules.txt b/vendor/modules.txt index 12b43f1e49..fc862f5d9c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1426,8 +1426,8 @@ github.com/munnerz/goautoneg # github.com/nats-io/jwt/v2 v2.5.8 ## explicit; go 1.18 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.10.18 -## explicit; go 1.21 +# github.com/nats-io/nats-server/v2 v2.10.20 +## explicit; go 1.21.0 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/fastrand github.com/nats-io/nats-server/v2/internal/ldap @@ -2252,7 +2252,7 @@ golang.org/x/text/transform golang.org/x/text/unicode/bidi golang.org/x/text/unicode/norm golang.org/x/text/width -# golang.org/x/time v0.5.0 +# golang.org/x/time v0.6.0 ## explicit; go 1.18 golang.org/x/time/rate # golang.org/x/tools v0.24.0