Merge pull request #1410 from opencloud-eu/dependabot/go_modules/github.com/nats-io/nats-server/v2-2.11.8

build(deps): bump github.com/nats-io/nats-server/v2 from 2.11.7 to 2.11.8
This commit is contained in:
Ralf Haferkamp
2025-09-02 19:00:48 +02:00
committed by GitHub
16 changed files with 185 additions and 94 deletions

2
go.mod
View File

@@ -56,7 +56,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.7
github.com/nats-io/nats-server/v2 v2.11.8
github.com/nats-io/nats.go v1.45.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.0.9

4
go.sum
View File

@@ -869,8 +869,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.7 h1:lINWQ/Hb3cnaoHmWTjj/7WppZnaSh9C/1cD//nHCbms=
github.com/nats-io/nats-server/v2 v2.11.7/go.mod h1:DchDPVzAsAPqhqm7VLedX0L7hjnV/SYtlmsl9F8U53s=
github.com/nats-io/nats-server/v2 v2.11.8 h1:7T1wwwd/SKTDWW47KGguENE7Wa8CpHxLD1imet1iW7c=
github.com/nats-io/nats-server/v2 v2.11.8/go.mod h1:C2zlzMA8PpiMMxeXSz7FkU3V+J+H15kiqrkvgtn2kS8=
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=

View File

@@ -704,7 +704,7 @@ func (c *client) initClient() {
if addr := c.nc.RemoteAddr(); addr != nil {
if conn = addr.String(); conn != _EMPTY_ {
host, port, _ := net.SplitHostPort(conn)
iPort, _ := strconv.Atoi(port)
iPort, _ := strconv.ParseUint(port, 10, 16)
c.host, c.port = host, uint16(iPort)
if c.isWebsocket() && c.ws.clientIP != _EMPTY_ {
cip := c.ws.clientIP

View File

@@ -58,7 +58,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.11.7"
VERSION = "2.11.8"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -2233,10 +2233,9 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
}
if cfg.SampleFrequency != o.cfg.SampleFrequency {
s := strings.TrimSuffix(cfg.SampleFrequency, "%")
// String has been already verified for validity up in the stack, so no
// need to check for error here.
sampleFreq, _ := strconv.Atoi(s)
o.sfreq = int32(sampleFreq)
if sampleFreq, err := strconv.ParseInt(s, 10, 32); err == nil {
o.sfreq = int32(sampleFreq)
}
}
// Set MaxDeliver if changed
if cfg.MaxDeliver != o.cfg.MaxDeliver {

View File

@@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !windows && !openbsd && !netbsd && !wasm
//go:build !windows && !openbsd && !netbsd && !wasm && !illumos && !solaris
package server

View File

@@ -0,0 +1,38 @@
// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build illumos || solaris
package server
import (
"os"
"golang.org/x/sys/unix"
)
func diskAvailable(storeDir string) int64 {
var ba int64
if _, err := os.Stat(storeDir); os.IsNotExist(err) {
os.MkdirAll(storeDir, defaultDirPerms)
}
var fs unix.Statvfs_t
if err := unix.Statvfs(storeDir, &fs); err == nil {
// Estimate 75% of available storage.
ba = int64(uint64(fs.Frsize) * uint64(fs.Bavail) / 4 * 3)
} else {
// Used 1TB default as a guess if all else fails.
ba = JetStreamMaxStoreDefault
}
return ba
}

View File

@@ -3057,7 +3057,7 @@ func (s *Server) debugSubscribers(sub *subscription, c *client, _ *Account, subj
replySubj := s.newRespInbox()
// Store our handler.
s.sys.replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
if n, err := strconv.Atoi(string(msg)); err == nil {
if n, err := strconv.ParseInt(string(msg), 10, 32); err == nil {
atomic.AddInt32(&nsubs, int32(n))
}
if atomic.AddInt32(&responses, 1) >= expected {

View File

@@ -1954,7 +1954,7 @@ func (fs *fileStore) recoverTTLState() error {
// Selecting the message block should return a block that contains this sequence,
// or a later block if it can't be found.
// It's an error if we can't find any block within the bounds of first and last seq.
fs.warn("Error loading msg block with seq %d for recovering TTL: %s", seq)
fs.warn("Error loading msg block with seq %d for recovering TTL", seq)
continue
}
seq = atomic.LoadUint64(&mb.first.seq)
@@ -1970,7 +1970,11 @@ func (fs *fileStore) recoverTTLState() error {
// beginning and process the next block.
mb.tryForceExpireCache()
mb = nil
goto retry
if seq <= fs.state.LastSeq {
goto retry
}
// Done.
break
}
msg, _, err := mb.fetchMsgNoCopy(seq, &sm)
if err != nil {
@@ -5039,7 +5043,7 @@ func (mb *msgBlock) compactWithFloor(floor uint64) {
// Grab info from a slot.
// Lock should be held.
func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
if mb.cache == nil || slot >= len(mb.cache.idx) {
if slot < 0 || mb.cache == nil || slot >= len(mb.cache.idx) {
return 0, 0, false, errPartialCache
}
@@ -8611,6 +8615,10 @@ func (fs *fileStore) Truncate(seq uint64) error {
return ErrStoreSnapshotInProgress
}
// Any existing state file will no longer be applicable. We will force write a new one
// at the end, after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
var lsm *StoreMsg
smb := fs.selectMsgBlock(seq)
if smb != nil {
@@ -8637,9 +8645,15 @@ func (fs *fileStore) Truncate(seq uint64) error {
return err
}
// The selected message block needs to be removed if it needs to be fully truncated.
var removeSmb bool
if smb != nil {
removeSmb = atomic.LoadUint64(&smb.first.seq) > seq
}
// If the selected block is not found or the message was deleted, we'll need to write a tombstone
// at the truncated sequence so we don't roll backward on our last sequence and timestamp.
if lsm == nil {
if lsm == nil || removeSmb {
fs.writeTombstone(seq, lastTime)
}
@@ -8679,8 +8693,28 @@ func (fs *fileStore) Truncate(seq uint64) error {
hasWrittenTombstones := len(tmb.tombs()) > 0
if smb != nil {
// Make sure writeable.
smb.mu.Lock()
if removeSmb {
purged += smb.msgs
bytes += smb.bytes
// We could have tombstones for messages before the truncated sequence.
if tombs := smb.tombsLocked(); len(tombs) > 0 {
// Temporarily unlock while we write tombstones.
smb.mu.Unlock()
for _, tomb := range tombs {
if tomb.seq < seq {
fs.writeTombstone(tomb.seq, tomb.ts)
}
}
smb.mu.Lock()
}
fs.removeMsgBlock(smb)
smb.mu.Unlock()
goto SKIP
}
// Make sure writeable.
if err := smb.enableForWriting(fs.fip); err != nil {
smb.mu.Unlock()
fs.mu.Unlock()
@@ -8711,6 +8745,7 @@ func (fs *fileStore) Truncate(seq uint64) error {
smb.mu.Unlock()
}
SKIP:
// If no tombstones were written, we can remove the block and
// purely rely on the selected block as the last block.
if !hasWrittenTombstones {
@@ -8736,9 +8771,6 @@ func (fs *fileStore) Truncate(seq uint64) error {
// Reset our subject lookup info.
fs.resetGlobalPerSubjectInfo()
// Any existing state file no longer applicable. We will force write a new one
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++
cb := fs.scb

View File

@@ -515,12 +515,22 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
js.mu.RUnlock()
return errors.New("consumer assignment or group missing")
}
if ca.deleted {
js.mu.RUnlock()
return nil // No further checks, consumer was deleted in the meantime.
}
created := ca.Created
node := ca.Group.node
js.mu.RUnlock()
// Check if not running at all.
o := mset.lookupConsumer(consumer)
if o == nil {
if time.Since(created) < 5*time.Second {
// No further checks, consumer is not available yet but should be soon.
// We'll start erroring once we're sure this consumer is actually broken.
return nil
}
return errors.New("consumer not found")
}
@@ -7288,6 +7298,7 @@ func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *stre
return nil
}
replicas := cfg.replicas(sa.Config)
peers := copyStrings(sa.Group.Peers)
var _ss [5]string
active := _ss[:0]
@@ -7300,20 +7311,20 @@ func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *stre
}
}
}
if quorum := cfg.Replicas/2 + 1; quorum > len(active) {
if quorum := replicas/2 + 1; quorum > len(active) {
// Not enough active to satisfy the request.
return nil
}
// If we want less then our parent stream, select from active.
if cfg.Replicas > 0 && cfg.Replicas < len(peers) {
if replicas > 0 && replicas < len(peers) {
// Pedantic in case stream is say R5 and consumer is R3 and 3 or more offline, etc.
if len(active) < cfg.Replicas {
if len(active) < replicas {
return nil
}
// First shuffle the active peers and then select to account for replica = 1.
rand.Shuffle(len(active), func(i, j int) { active[i], active[j] = active[j], active[i] })
peers = active[:cfg.Replicas]
peers = active[:replicas]
}
storage := sa.Config.Storage
if cfg.MemoryStorage {
@@ -7499,12 +7510,6 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// We need to set the ephemeral here before replicating.
if !isDurableConsumer(cfg) {
// We chose to have ephemerals be R=1 unless stream is interest or workqueue.
// Consumer can override.
if sa.Config.Retention == LimitsPolicy && cfg.Replicas <= 1 {
rg.Peers = []string{rg.Preferred}
rg.Name = groupNameForConsumer(rg.Peers, rg.Storage)
}
if cfg.Name != _EMPTY_ {
oname = cfg.Name
} else {

View File

@@ -1100,7 +1100,7 @@ func mqttParsePubRelNATSHeader(headerBytes []byte) uint16 {
if len(pubrelValue) == 0 {
return 0
}
pi, _ := strconv.Atoi(string(pubrelValue))
pi, _ := strconv.ParseUint(string(pubrelValue), 10, 16)
return uint16(pi)
}

View File

@@ -1,4 +1,4 @@
// Copyright 2015-2018 The NATS Authors
// Copyright 2015-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -10,14 +10,36 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copied from pse_openbsd.go
//go:build illumos || solaris
package pse
// This is a placeholder for now.
func ProcUsage(pcpu *float64, rss, vss *int64) error {
*pcpu = 0.0
*rss = 0
*vss = 0
import (
"fmt"
"os"
"os/exec"
"strings"
)
// ProcUsage returns CPU usage
func ProcUsage(pcpu *float64, rss, vss *int64) error {
pidStr := fmt.Sprintf("%d", os.Getpid())
out, err := exec.Command("ps", "-o", "pcpu,rss,vsz", "-p", pidStr).Output()
if err != nil {
*rss, *vss = -1, -1
return fmt.Errorf("ps call failed:%v", err)
}
lines := strings.Split(string(out), "\n")
if len(lines) < 2 {
*rss, *vss = -1, -1
return fmt.Errorf("no ps output")
}
output := lines[1]
fmt.Sscanf(output, "%f %d %d", pcpu, rss, vss)
*rss *= 1024 // 1k blocks, want bytes.
*vss *= 1024 // 1k blocks, want bytes.
return nil
}

View File

@@ -487,12 +487,14 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
ae, err := n.loadEntry(index)
if err != nil {
n.warn("Could not load %d from WAL [%+v]: %v", index, state, err)
truncateAndErr(index)
// Truncate to the previous correct entry.
truncateAndErr(index - 1)
break
}
if ae.pindex != index-1 {
n.warn("Corrupt WAL, will truncate")
truncateAndErr(index)
// Truncate to the previous correct entry.
truncateAndErr(index - 1)
break
}
n.processAppendEntry(ae, nil)
@@ -1100,8 +1102,8 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
if n.applied > n.papplied {
entries = n.applied - n.papplied
}
if n.bytes > 0 {
bytes = entries * n.bytes / (n.pindex - n.papplied)
if msgs := n.pindex - n.papplied; msgs > 0 {
bytes = entries * n.bytes / msgs
}
return entries, bytes
}
@@ -3316,6 +3318,10 @@ func (n *raft) truncateWAL(term, index uint64) {
if n.papplied > n.applied {
n.papplied = n.applied
}
// Refresh bytes count after truncate.
var state StreamState
n.wal.FastState(&state)
n.bytes = state.Bytes
}()
if err := n.wal.Truncate(index); err != nil {
@@ -3372,14 +3378,22 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var scratch [appendEntryResponseLen]byte
arbuf := scratch[:]
// Grab term from append entry. But if leader explicitly defined its term, use that instead.
// This is required during catchup if the leader catches us up on older items from previous terms.
// While still allowing us to confirm they're matching our highest known term.
lterm := ae.term
if ae.lterm != 0 {
lterm = ae.lterm
}
// Are we receiving from another leader.
if n.State() == Leader {
// If we are the same we should step down to break the tie.
if ae.term >= n.term {
if lterm >= n.term {
// If the append entry term is newer than the current term, erase our
// vote.
if ae.term > n.term {
n.term = ae.term
if lterm > n.term {
n.term = lterm
n.vote = noVote
n.writeTermVote()
}
@@ -3391,10 +3405,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("AppendEntry ignoring old term from another leader")
n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf))
arPool.Put(ar)
n.Unlock()
return
}
// Always return here from processing.
n.Unlock()
return
}
// If we received an append entry as a candidate then it would appear that
@@ -3403,11 +3416,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if n.State() == Candidate {
// If we have a leader in the current term or higher, we should stepdown,
// write the term and vote if the term of the request is higher.
if ae.term >= n.term {
if lterm >= n.term {
// If the append entry term is newer than the current term, erase our
// vote.
if ae.term > n.term {
n.term = ae.term
if lterm > n.term {
n.term = lterm
n.vote = noVote
n.writeTermVote()
}
@@ -3431,9 +3444,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}
// If we are catching up ignore old catchup subs.
// If we are/were catching up ignore old catchup subs.
// This could happen when we stall or cancel a catchup.
if !isNew && catchingUp && sub != n.catchup.sub {
if !isNew && sub != nil && (!catchingUp || sub != n.catchup.sub) {
n.Unlock()
n.debug("AppendEntry ignoring old entry from previous catchup")
return
@@ -3466,14 +3479,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}
// Grab term from append entry. But if leader explicitly defined its term, use that instead.
// This is required during catchup if the leader catches us up on older items from previous terms.
// While still allowing us to confirm they're matching our highest known term.
lterm := ae.term
if ae.lterm != 0 {
lterm = ae.lterm
}
// If this term is greater than ours.
if lterm > n.term {
n.term = lterm
@@ -3514,7 +3519,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
n.debug("AppendEntry detected pindex less than/equal to ours: [%d:%d] vs [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse
var success bool
if ae.pindex < n.commit {
@@ -3523,10 +3527,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("AppendEntry pindex %d below commit %d, marking success", ae.pindex, n.commit)
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
// This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && isNew {
success = true
n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex, n.commit)
n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex)
} else if ae.pindex == n.pindex {
// Check if only our terms do not match here.
// Make sure pterms match and we take on the leader's.
@@ -3564,12 +3568,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if !success {
n.cancelCatchup()
}
// Create response.
ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success)
// Intentionally not responding. Otherwise, we could erroneously report "success". Reporting
// non-success is not needed either, and would only be wasting messages.
// For example, if we got partial catchup, and then the "real-time" messages came in very delayed.
// If we reported "success" on those "real-time" messages, we'd wrongfully be providing
// quorum while not having an up-to-date log.
n.Unlock()
n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf))
arPool.Put(ar)
return
}
@@ -3722,6 +3726,7 @@ CONTINUE:
// Only ever respond to new entries.
// Never respond to catchup messages, because providing quorum based on this is unsafe.
// The only way for the leader to receive "success" MUST be through this path.
var ar *appendEntryResponse
if sub != nil && isNew {
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true)

View File

@@ -209,7 +209,7 @@ func transformIndexIntArgsHelper(token string, args []string, transformType int1
if err != nil {
return BadTransform, []int{}, -1, _EMPTY_, &mappingDestinationErr{token, ErrMappingDestinationInvalidArg}
}
mappingFunctionIntArg, err := strconv.Atoi(strings.Trim(args[1], " "))
mappingFunctionIntArg, err := strconv.ParseInt(strings.Trim(args[1], " "), 10, 32)
if err != nil {
return BadTransform, []int{}, -1, _EMPTY_, &mappingDestinationErr{token, ErrMappingDestinationInvalidArg}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2025 The NATS Authors
// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -11,37 +11,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build gofuzz
//go:build illumos || solaris
package server
package sysmem
var defaultFuzzServerOptions = Options{
Host: "127.0.0.1",
Trace: true,
Debug: true,
DisableShortFirstPing: true,
NoLog: true,
NoSigs: true,
}
import (
"golang.org/x/sys/unix"
)
func dummyFuzzClient() *client {
return &client{srv: New(&defaultFuzzServerOptions), msubs: -1, mpay: MAX_PAYLOAD_SIZE, mcl: MAX_CONTROL_LINE_SIZE}
}
const (
_SC_PHYS_PAGES = 500
_SC_PAGESIZE = 11
)
func FuzzClient(data []byte) int {
if len(data) < 100 {
return -1
}
c := dummyFuzzClient()
err := c.parse(data[:50])
func Memory() int64 {
pages, err := unix.Sysconf(_SC_PHYS_PAGES)
if err != nil {
return 0
}
err = c.parse(data[50:])
pageSize, err := unix.Sysconf(_SC_PAGESIZE)
if err != nil {
return 0
}
return 1
return int64(pages) * int64(pageSize)
}

2
vendor/modules.txt vendored
View File

@@ -1053,7 +1053,7 @@ github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.7.4
## explicit; go 1.23.0
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.11.7
# github.com/nats-io/nats-server/v2 v2.11.8
## explicit; go 1.23.0
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand