build(deps): bump github.com/nats-io/nats-server/v2

Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.12.0 to 2.12.1.
- [Release notes](https://github.com/nats-io/nats-server/releases)
- [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml)
- [Commits](https://github.com/nats-io/nats-server/compare/v2.12.0...v2.12.1)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
  dependency-version: 2.12.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2025-11-04 09:33:11 +00:00
committed by Ralf Haferkamp
parent 6e0bb09aff
commit dcaa1ceadb
30 changed files with 1193 additions and 612 deletions

6
go.mod
View File

@@ -55,7 +55,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.12.0
github.com/nats-io/nats-server/v2 v2.12.1
github.com/nats-io/nats.go v1.47.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.1.0
@@ -236,7 +236,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v1.9.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/go-tpm v0.9.5 // indirect
github.com/google/go-tpm v0.9.6 // indirect
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
github.com/google/renameio/v2 v2.0.0 // indirect
github.com/gookit/goutil v0.7.1 // indirect
@@ -377,7 +377,7 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/mod v0.28.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/time v0.13.0 // indirect
golang.org/x/time v0.14.0 // indirect
golang.org/x/tools v0.37.0 // indirect
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 // indirect

12
go.sum
View File

@@ -565,8 +565,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/go-tika v0.3.1 h1:l+jr10hDhZjcgxFRfcQChRLo1bPXQeLFluMyvDhXTTA=
github.com/google/go-tika v0.3.1/go.mod h1:DJh5N8qxXIl85QkqmXknd+PeeRkUOTbvwyYf7ieDz6c=
github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU=
github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA=
github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@@ -899,8 +899,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.0 h1:OIwe8jZUqJFrh+hhiyKu8snNib66qsx806OslqJuo74=
github.com/nats-io/nats-server/v2 v2.12.0/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww=
github.com/nats-io/nats-server/v2 v2.12.1 h1:0tRrc9bzyXEdBLcHr2XEjDzVpUxWx64aZBm7Rl1QDrA=
github.com/nats-io/nats-server/v2 v2.12.1/go.mod h1:OEaOLmu/2e6J9LzUt2OuGjgNem4EpYApO5Rpf26HDs8=
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
@@ -1591,8 +1591,8 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -4486,7 +4486,10 @@ func (dr *DirAccResolver) Start(s *Server) error {
}); err != nil {
return fmt.Errorf("error setting up list request handling: %v", err)
}
if _, err := s.sysSubscribe(accDeleteReqSubj, func(_ *subscription, _ *client, _ *Account, _, reply string, msg []byte) {
if _, err := s.sysSubscribe(accDeleteReqSubj, func(_ *subscription, c *client, _ *Account, _, reply string, msg []byte) {
// As this is a raw message, we need to extract payload and only decode claims from it,
// in case request is sent with headers.
_, msg = c.msgParts(msg)
handleDeleteRequest(dr.DirJWTStore, s, msg, reply)
}); err != nil {
return fmt.Errorf("error setting up delete request handling: %v", err)
@@ -4753,7 +4756,10 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
}); err != nil {
return fmt.Errorf("error setting up list request handling: %v", err)
}
if _, err := s.sysSubscribe(accDeleteReqSubj, func(_ *subscription, _ *client, _ *Account, _, reply string, msg []byte) {
if _, err := s.sysSubscribe(accDeleteReqSubj, func(_ *subscription, c *client, _ *Account, _, reply string, msg []byte) {
// As this is a raw message, we need to extract payload and only decode claims from it,
// in case request is sent with headers.
_, msg = c.msgParts(msg)
handleDeleteRequest(dr.DirJWTStore, s, msg, reply)
}); err != nil {
return fmt.Errorf("error setting up list request handling: %v", err)

View File

@@ -14,6 +14,7 @@
package server
import (
"crypto/fips140"
"crypto/tls"
)
@@ -52,6 +53,16 @@ var curvePreferenceMap = map[string]tls.CurveID{
// reorder to default to the highest level of security. See:
// https://blog.bracebin.com/achieving-perfect-ssl-labs-score-with-go
func defaultCurvePreferences() []tls.CurveID {
if fips140.Enabled() {
// X25519 is not FIPS-approved by itself, but it is when
// combined with MLKEM768.
return []tls.CurveID{
tls.X25519MLKEM768, // post-quantum
tls.CurveP256,
tls.CurveP384,
tls.CurveP521,
}
}
return []tls.CurveID{
tls.X25519MLKEM768, // post-quantum
tls.X25519, // faster than P256, arguably more secure

View File

@@ -690,6 +690,14 @@ func (c *client) initClient() {
opts := s.getOpts()
// Snapshots to avoid mutex access in fast paths.
c.out.wdl = opts.WriteDeadline
switch {
case c.kind == ROUTER && opts.Cluster.WriteDeadline > 0:
c.out.wdl = opts.Cluster.WriteDeadline
case c.kind == GATEWAY && opts.Gateway.WriteDeadline > 0:
c.out.wdl = opts.Gateway.WriteDeadline
case c.kind == LEAF && opts.LeafNode.WriteDeadline > 0:
c.out.wdl = opts.LeafNode.WriteDeadline
}
c.out.mp = opts.MaxPending
// Snapshot max control line since currently can not be changed on reload and we
// were checking it on each call to parse. If this changes and we allow MaxControlLine

View File

@@ -66,7 +66,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.12.0"
VERSION = "2.12.1"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -1978,5 +1978,25 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSAtomicPublishInvalidBatchCommitErr",
"code": 400,
"error_code": 10200,
"description": "atomic publish batch commit is invalid",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSAtomicPublishContainsDuplicateMessageErr",
"code": 400,
"error_code": 10201,
"description": "atomic publish batch contains duplicate message id",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]

View File

@@ -179,7 +179,9 @@ type fileStore struct {
scb StorageUpdateHandler
rmcb StorageRemoveMsgHandler
pmsgcb ProcessJetStreamMsgHandler
ageChk *time.Timer
ageChk *time.Timer // Timer to expire messages.
ageChkRun bool // Whether message expiration is currently running.
ageChkTime int64 // When the message expiration is scheduled to run.
syncTmr *time.Timer
cfg FileStreamInfo
fcfg FileStoreConfig
@@ -268,7 +270,6 @@ type cache struct {
buf []byte
wp int
idx []uint32
lrl uint32
fseq uint64
nra bool
}
@@ -354,6 +355,9 @@ const (
// For smaller reuse buffers. Usually being generated during contention on the lead write buffer.
// E.g. mirrors/sources etc.
defaultSmallBlockSize = 1 * 1024 * 1024 // 1MB
// NOT an actual block size, but used for the sync.Pools, so that we don't allocate huge buffers
// unnecessarily until there are enough writes to justify it.
defaultTinyBlockSize = 1 * 1024 * 256 // 256KB
// Maximum size for the encrypted head block.
maximumEncryptedBlockSize = 2 * 1024 * 1024 // 2MB
// Default for KV based
@@ -692,6 +696,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
if fs.ageChk != nil && fs.cfg.MaxAge == 0 {
fs.ageChk.Stop()
fs.ageChk = nil
fs.ageChkTime = 0
}
if fs.cfg.MaxMsgsPer > 0 && (old_cfg.MaxMsgsPer == 0 || fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer) {
@@ -944,6 +949,12 @@ func (fs *fileStore) writeStreamMeta() error {
}
// Pools to recycle the blocks to help with memory pressure.
var blkPoolTiny = &sync.Pool{
New: func() any {
b := [defaultTinyBlockSize]byte{}
return &b
},
}
var blkPoolSmall = &sync.Pool{
New: func() any {
b := [defaultSmallBlockSize]byte{}
@@ -966,6 +977,8 @@ var blkPoolBig = &sync.Pool{
// Get a new msg block based on sz estimate.
func getMsgBlockBuf(sz int) (buf []byte) {
switch {
case sz <= defaultTinyBlockSize:
return blkPoolTiny.Get().(*[defaultTinyBlockSize]byte)[:0]
case sz <= defaultSmallBlockSize:
return blkPoolSmall.Get().(*[defaultSmallBlockSize]byte)[:0]
case sz <= defaultMediumBlockSize:
@@ -983,6 +996,9 @@ func getMsgBlockBuf(sz int) (buf []byte) {
// Recycle the msg block.
func recycleMsgBlockBuf(buf []byte) {
switch cap(buf) {
case defaultTinyBlockSize:
b := (*[defaultTinyBlockSize]byte)(buf[0:defaultTinyBlockSize])
blkPoolTiny.Put(b)
case defaultSmallBlockSize:
b := (*[defaultSmallBlockSize]byte)(buf[0:defaultSmallBlockSize])
blkPoolSmall.Put(b)
@@ -1503,12 +1519,30 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
// For tombstones that we find and collect.
var (
tombstones []uint64
minTombstoneSeq uint64
minTombstoneTs int64
maxTombstoneSeq uint64
maxTombstoneTs int64
)
// To detect gaps from compaction, and to ensure the sequence keeps moving up.
var last uint64
var hb [highwayhash.Size64]byte
updateLast := func(seq uint64, ts int64) {
// The sequence needs to only ever move up.
if seq <= last {
return
}
// Check for any gaps from compaction, meaning no ebit entry.
if last > 0 && seq != last+1 && mb.msgs != 0 {
for dseq := last + 1; dseq < seq; dseq++ {
addToDmap(dseq)
}
}
last = seq
atomic.StoreUint64(&mb.last.seq, last)
mb.last.ts = ts
}
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
@@ -1545,7 +1579,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
checksum := hh.Sum(hb[:0])
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), tombstones, errBadMsg{mb.mfn, "invalid checksum"}
@@ -1562,8 +1596,8 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
seq = seq &^ tbit
// Need to process this here and make sure we have accounted for this properly.
tombstones = append(tombstones, seq)
if minTombstoneSeq == 0 || seq < minTombstoneSeq {
minTombstoneSeq, minTombstoneTs = seq, ts
if maxTombstoneSeq == 0 || seq > maxTombstoneSeq {
maxTombstoneSeq, maxTombstoneTs = seq, ts
}
index += rl
continue
@@ -1574,8 +1608,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
if seq == 0 || seq&ebit != 0 || seq < fseq {
seq = seq &^ ebit
if seq >= fseq {
atomic.StoreUint64(&mb.last.seq, seq)
mb.last.ts = ts
updateLast(seq, ts)
if mb.msgs == 0 {
atomic.StoreUint64(&mb.first.seq, seq+1)
mb.first.ts = 0
@@ -1623,17 +1656,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
}
}
// Check for any gaps from compaction, meaning no ebit entry.
if last > 0 && seq != last+1 {
for dseq := last + 1; dseq < seq; dseq++ {
addToDmap(dseq)
}
}
// Always set last
last = seq
atomic.StoreUint64(&mb.last.seq, last)
mb.last.ts = ts
updateLast(seq, ts)
// Advance to next record.
index += rl
@@ -1646,12 +1669,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
fseq := atomic.LoadUint64(&mb.first.seq)
if fseq > 0 {
atomic.StoreUint64(&mb.last.seq, fseq-1)
} else if fseq == 0 && minTombstoneSeq > 0 {
atomic.StoreUint64(&mb.first.seq, minTombstoneSeq+1)
} else if fseq == 0 && maxTombstoneSeq > 0 {
atomic.StoreUint64(&mb.first.seq, maxTombstoneSeq+1)
mb.first.ts = 0
if mb.last.seq == 0 {
atomic.StoreUint64(&mb.last.seq, minTombstoneSeq)
mb.last.ts = minTombstoneTs
atomic.StoreUint64(&mb.last.seq, maxTombstoneSeq)
mb.last.ts = maxTombstoneTs
}
}
}
@@ -1739,7 +1762,8 @@ func (fs *fileStore) recoverFullState() (rerr error) {
buf = buf[:len(buf)-highwayhash.Size64]
fs.hh.Reset()
fs.hh.Write(buf)
if !bytes.Equal(h, fs.hh.Sum(nil)) {
var hb [highwayhash.Size64]byte
if !bytes.Equal(h, fs.hh.Sum(hb[:0])) {
os.Remove(fn)
fs.warn("Stream state checksum did not match")
return errCorruptState
@@ -2266,6 +2290,12 @@ func (fs *fileStore) recoverMsgs() error {
fs.removeMsgBlockFromList(mb)
continue
}
// If the stream is empty, reset the first/last sequences so these can
// properly move up based purely on tombstones spread over multiple blocks.
if fs.state.Msgs == 0 {
fs.state.FirstSeq, fs.state.LastSeq = 0, 0
fs.state.FirstTime, fs.state.LastTime = time.Time{}, time.Time{}
}
fseq := atomic.LoadUint64(&mb.first.seq)
if fs.state.FirstSeq == 0 || (fseq < fs.state.FirstSeq && mb.first.ts != 0) {
fs.state.FirstSeq = fseq
@@ -2541,7 +2571,6 @@ func copyMsgBlocks(src []*msgBlock) []*msgBlock {
// GetSeqFromTime looks for the first sequence number that has
// the message with >= timestamp.
// FIXME(dlc) - inefficient, and dumb really. Make this better.
func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
fs.mu.RLock()
lastSeq := fs.state.LastSeq
@@ -2561,14 +2590,17 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
lseq := atomic.LoadUint64(&mb.last.seq)
var smv StoreMsg
// Linear search, hence the dumb part..
ts := t.UnixNano()
for seq := fseq; seq <= lseq; seq++ {
sm, _, _ := mb.fetchMsgNoCopy(seq, &smv)
if sm != nil && sm.ts >= ts {
return sm.seq
}
// Because sort.Search expects range [0,off), we have to manually
// calculate the offset from the first sequence.
off := int(lseq - fseq + 1)
i := sort.Search(off, func(i int) bool {
sm, _, _ := mb.fetchMsgNoCopy(fseq+uint64(i), &smv)
return sm != nil && sm.ts >= ts
})
if i < off {
return fseq + uint64(i)
}
return 0
}
@@ -4426,6 +4458,8 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t
if schedule, ok := getMessageSchedule(hdr); ok && !schedule.IsZero() {
fs.scheduling.add(seq, subj, schedule.UnixNano())
fs.lmb.schedules++
} else {
fs.scheduling.removeSubject(subj)
}
}
@@ -4441,9 +4475,7 @@ func (fs *fileStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t
// sooner than initial replica expiry timer set to MaxAge when initializing.
if !fs.receivedAny && fs.cfg.MaxAge != 0 && ts > 0 {
fs.receivedAny = true
// don't block here by calling expireMsgs directly.
// Instead, set short timeout.
fs.resetAgeChk(int64(time.Millisecond * 50))
fs.resetAgeChk(0)
}
fs.mu.Unlock()
@@ -4515,19 +4547,27 @@ func (mb *msgBlock) skipMsg(seq uint64, now int64) {
}
// SkipMsg will use the next sequence number but not store anything.
func (fs *fileStore) SkipMsg() uint64 {
func (fs *fileStore) SkipMsg(seq uint64) (uint64, error) {
// Grab time.
now := ats.AccessTime()
fs.mu.Lock()
defer fs.mu.Unlock()
// Check sequence matches our last sequence.
if seq != fs.state.LastSeq+1 {
if seq > 0 {
return 0, ErrSequenceMismatch
}
seq = fs.state.LastSeq + 1
}
// Grab our current last message block.
mb, err := fs.checkLastBlock(emptyRecordLen)
if err != nil {
return 0
return 0, err
}
// Grab time and last seq.
now, seq := ats.AccessTime(), fs.state.LastSeq+1
// Write skip msg.
mb.skipMsg(seq, now)
@@ -4542,7 +4582,7 @@ func (fs *fileStore) SkipMsg() uint64 {
// Mark as dirty for stream state.
fs.dirty++
return seq
return seq, nil
}
// Skip multiple msgs. We will determine if we can fit into current lmb or we need to create a new block.
@@ -5030,6 +5070,12 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// If we are tracking multiple subjects here make sure we update that accounting.
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
if fs.ttls != nil {
if ttl, err := getMessageTTL(sm.hdr); err == nil {
expires := time.Duration(sm.ts) + (time.Second * time.Duration(ttl))
fs.ttls.Remove(seq, int64(expires))
}
}
fifo := seq == atomic.LoadUint64(&mb.first.seq)
isLastBlock := mb == fs.lmb
@@ -5037,10 +5083,18 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// If erase but block is empty, we can simply remove the block later.
if secure && !isEmpty {
// Grab record info.
ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq))
if err := mb.eraseMsg(seq, int(ri), int(rl), isLastBlock); err != nil {
// Grab record info, but use the pre-computed record length.
ri, _, _, err := mb.slotInfo(int(seq - mb.cache.fseq))
if err != nil {
mb.finishedWithCache()
mb.mu.Unlock()
fsUnlock()
return false, err
}
if err := mb.eraseMsg(seq, int(ri), int(msz), isLastBlock); err != nil {
mb.finishedWithCache()
mb.mu.Unlock()
fsUnlock()
return false, err
}
}
@@ -5283,7 +5337,14 @@ func (mb *msgBlock) compactWithFloor(floor uint64) {
// Grab info from a slot.
// Lock should be held.
func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
if slot < 0 || mb.cache == nil || slot >= len(mb.cache.idx) {
switch {
case mb.cache == nil: // Shouldn't be possible, but check it anyway.
return 0, 0, false, errNoCache
case slot < 0:
mb.fs.warn("Partial cache: offset slot index %d is less zero", slot)
return 0, 0, false, errPartialCache
case slot >= len(mb.cache.idx):
mb.fs.warn("Partial cache: offset slot index %d is greater than index len %d", slot, len(mb.cache.idx))
return 0, 0, false, errPartialCache
}
@@ -5297,24 +5358,20 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
// Determine record length
var rl uint32
if slot >= len(mb.cache.idx) {
rl = mb.cache.lrl
} else {
// Need to account for dbit markers in idx.
// So we will walk until we find valid idx slot to calculate rl.
for i := 1; slot+i < len(mb.cache.idx); i++ {
ni := mb.cache.idx[slot+i] &^ cbit
if ni == dbit {
continue
}
rl = ni - ri
break
}
// check if we had all trailing dbits.
// If so use len of cache buf minus ri.
if rl == 0 {
rl = uint32(len(mb.cache.buf)) - ri
// Need to account for dbit markers in idx.
// So we will walk until we find valid idx slot to calculate rl.
for i := 1; slot+i < len(mb.cache.idx); i++ {
ni := mb.cache.idx[slot+i] &^ cbit
if ni == dbit {
continue
}
rl = ni - ri
break
}
// check if we had all trailing dbits.
// If so use len of cache buf minus ri.
if rl == 0 {
rl = uint32(len(mb.cache.buf)) - ri
}
if rl < msgHdrSize {
return 0, 0, false, errBadMsg{mb.mfn, fmt.Sprintf("length too short for slot %d", slot)}
@@ -5463,7 +5520,8 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int, isLastBlock bool) error {
mb.hh.Reset()
mb.hh.Write(hdr[4:20])
mb.hh.Write(data)
checksum := mb.hh.Sum(nil)
var hb [highwayhash.Size64]byte
checksum := mb.hh.Sum(hb[:0])
// Write to msg record.
b.Write(checksum)
@@ -5637,9 +5695,21 @@ func (mb *msgBlock) selectNextFirst() {
}
// Select the next FirstSeq
// Also cleans up empty blocks at the start only containing tombstones.
// Lock should be held.
func (fs *fileStore) selectNextFirst() {
if len(fs.blks) > 0 {
for len(fs.blks) > 1 {
mb := fs.blks[0]
mb.mu.Lock()
empty := mb.msgs == 0
if !empty {
mb.mu.Unlock()
break
}
fs.forceRemoveMsgBlock(mb)
mb.mu.Unlock()
}
mb := fs.blks[0]
mb.mu.RLock()
fs.state.FirstSeq = atomic.LoadUint64(&mb.first.seq)
@@ -5726,10 +5796,10 @@ func (mb *msgBlock) tryForceExpireCache() {
// We will attempt to force expire this by temporarily clearing the last load time.
func (mb *msgBlock) tryForceExpireCacheLocked() {
llts := mb.llts
mb.llts = 0
llts, lwts := mb.llts, mb.lwts
mb.llts, mb.lwts = 0, 0
mb.expireCacheLocked()
mb.llts = llts
mb.llts, mb.lwts = llts, lwts
}
// This is for expiration of the write cache, which will be partial with fip.
@@ -5804,6 +5874,7 @@ func (mb *msgBlock) expireCacheLocked() {
recycleMsgBlockBuf(mb.cache.buf)
}
mb.cache.buf = nil
mb.cache.idx = mb.cache.idx[:0]
mb.cache.wp = 0
}
@@ -5823,6 +5894,12 @@ func (fs *fileStore) startAgeChk() {
// Lock should be held.
func (fs *fileStore) resetAgeChk(delta int64) {
// If we're already expiring messages, it will make sure to reset.
// Don't trigger again, as that could result in many expire goroutines.
if fs.ageChkRun {
return
}
var next int64 = math.MaxInt64
if fs.ttls != nil {
next = fs.ttls.GetNextExpiration(next)
@@ -5868,6 +5945,14 @@ func (fs *fileStore) resetAgeChk(delta int64) {
fireIn = 250 * time.Millisecond
}
// If we want to kick the timer to run later than what was assigned before, don't reset it.
// Otherwise, we could get in a situation where the timer is continuously reset, and it never runs.
expires := ats.AccessTime() + fireIn.Nanoseconds()
if fs.ageChkTime > 0 && expires > fs.ageChkTime {
return
}
fs.ageChkTime = expires
if fs.ageChk != nil {
fs.ageChk.Reset(fireIn)
} else {
@@ -5880,6 +5965,7 @@ func (fs *fileStore) cancelAgeChk() {
if fs.ageChk != nil {
fs.ageChk.Stop()
fs.ageChk = nil
fs.ageChkTime = 0
}
}
@@ -5890,18 +5976,22 @@ func (fs *fileStore) expireMsgs() {
var smv StoreMsg
var sm *StoreMsg
fs.mu.RLock()
fs.mu.Lock()
maxAge := int64(fs.cfg.MaxAge)
minAge := ats.AccessTime() - maxAge
rmcb := fs.rmcb
pmsgcb := fs.pmsgcb
sdmTTL := int64(fs.cfg.SubjectDeleteMarkerTTL.Seconds())
sdmEnabled := sdmTTL > 0
fs.mu.RUnlock()
// If SDM is enabled, but handlers aren't set up yet. Try again later.
if sdmEnabled && (rmcb == nil || pmsgcb == nil) {
fs.resetAgeChk(0)
fs.mu.Unlock()
return
}
fs.ageChkRun = true
fs.mu.Unlock()
if maxAge > 0 {
var seq uint64
@@ -5917,7 +6007,7 @@ func (fs *fileStore) expireMsgs() {
// if it was the last message of that particular subject that we just deleted.
if sdmEnabled {
if last, ok := fs.shouldProcessSdm(seq, sm.subj); ok {
sdm := last && isSubjectDeleteMarker(sm.hdr)
sdm := last && !isSubjectDeleteMarker(sm.hdr)
fs.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL)
}
} else {
@@ -5929,95 +6019,92 @@ func (fs *fileStore) expireMsgs() {
minAge = ats.AccessTime() - maxAge
}
}
var ageDelta int64
if sm != nil {
ageDelta = sm.ts - minAge
}
fs.mu.Lock()
defer fs.mu.Unlock()
// TODO: Not great that we're holding the lock here, but the timed hash wheel isn't thread-safe.
nextTTL := int64(math.MaxInt64)
var rmSeqs []uint64
var ttlSdm map[string][]SDMBySubj
var rmSeqs []thw.HashWheelEntry
if fs.ttls != nil {
fs.ttls.ExpireTasks(func(seq uint64, ts int64) bool {
// Need to grab subject for the specified sequence if for SDM, and check
// if the message hasn't been removed in the meantime.
sm, _ = fs.msgForSeqLocked(seq, &smv, false)
if sm == nil {
return true
}
if sdmEnabled {
if ttlSdm == nil {
ttlSdm = make(map[string][]SDMBySubj, 1)
}
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)})
} else {
// Collect sequences to remove. Don't remove messages inline here,
// as that releases the lock and THW is not thread-safe.
rmSeqs = append(rmSeqs, seq)
}
// Removing messages out of band, those can fail, and we can be shutdown halfway
rmSeqs = append(rmSeqs, thw.HashWheelEntry{Seq: seq, Expires: ts})
// We might need to remove messages out of band, those can fail, and we can be shutdown halfway
// through so don't remove from THW just yet.
return false
})
if maxAge > 0 {
// Only check if we're expiring something in the next MaxAge interval, saves us a bit
// of work if MaxAge will beat us to the next expiry anyway.
nextTTL = fs.ttls.GetNextExpiration(time.Now().Add(time.Duration(maxAge)).UnixNano())
} else {
nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64)
}
nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64)
}
// Remove messages collected by THW.
for _, seq := range rmSeqs {
fs.removeMsg(seq, false, false, false)
}
// THW is unordered, so must sort by sequence and must not be holding the lock.
if len(ttlSdm) > 0 {
if !sdmEnabled {
for _, rm := range rmSeqs {
fs.removeMsg(rm.Seq, false, false, false)
}
} else {
// THW is unordered, so must sort by sequence and must not be holding the lock.
fs.mu.Unlock()
for subj, es := range ttlSdm {
slices.SortFunc(es, func(a, b SDMBySubj) int {
if a.seq == b.seq {
return 0
} else if a.seq < b.seq {
return -1
} else {
return 1
}
})
for _, e := range es {
if last, ok := fs.shouldProcessSdm(e.seq, subj); ok {
sdm := last && !e.sdm
fs.handleRemovalOrSdm(e.seq, subj, sdm, sdmTTL)
}
slices.SortFunc(rmSeqs, func(a, b thw.HashWheelEntry) int {
if a.Seq == b.Seq {
return 0
} else if a.Seq < b.Seq {
return -1
} else {
return 1
}
})
for _, rm := range rmSeqs {
// Need to grab subject for the specified sequence if for SDM, and check
// if the message hasn't been removed in the meantime.
// We need to grab the message and check if we should process SDM while holding the lock,
// otherwise we can race if a deletion of this message is in progress.
fs.mu.Lock()
sm, _ = fs.msgForSeqLocked(rm.Seq, &smv, false)
if sm == nil {
fs.ttls.Remove(rm.Seq, rm.Expires)
fs.mu.Unlock()
continue
}
last, ok := fs.shouldProcessSdmLocked(rm.Seq, sm.subj)
fs.mu.Unlock()
if ok {
sdm := last && !isSubjectDeleteMarker(sm.hdr)
fs.handleRemovalOrSdm(rm.Seq, sm.subj, sdm, sdmTTL)
}
}
fs.mu.Lock()
}
// Only cancel if no message left, not on potential lookup error that would result in sm == nil.
fs.ageChkRun, fs.ageChkTime = false, 0
if fs.state.Msgs == 0 && nextTTL == math.MaxInt64 {
fs.cancelAgeChk()
} else {
if sm == nil {
fs.resetAgeChk(0)
} else {
fs.resetAgeChk(sm.ts - minAge)
}
fs.resetAgeChk(ageDelta)
}
}
func (fs *fileStore) shouldProcessSdm(seq uint64, subj string) (bool, bool) {
fs.mu.Lock()
defer fs.mu.Unlock()
return fs.shouldProcessSdmLocked(seq, subj)
}
// Lock should be held.
func (fs *fileStore) shouldProcessSdmLocked(seq uint64, subj string) (bool, bool) {
if fs.sdm == nil {
fs.sdm = newSDMMeta()
}
if p, ok := fs.sdm.pending[seq]; ok {
// Don't allow more proposals for the same sequence if we already did recently.
if time.Since(time.Unix(0, p.ts)) < 2*time.Second {
return p.last, false
}
// If we're about to use the cached value, and we knew it was last before,
// quickly check that we don't have more remaining messages for the subject now.
// Which means we are not the last anymore and must reset to not remove later data.
@@ -6028,11 +6115,6 @@ func (fs *fileStore) shouldProcessSdm(seq uint64, subj string) (bool, bool) {
p.last = false
}
}
// Don't allow more proposals for the same sequence if we already did recently.
if time.Since(time.Unix(0, p.ts)) < 2*time.Second {
return p.last, false
}
fs.sdm.pending[seq] = SDMBySeq{p.last, time.Now().UnixNano()}
return p.last, true
}
@@ -6072,9 +6154,15 @@ func (fs *fileStore) runMsgScheduling() {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.scheduling == nil || fs.pmsgcb == nil {
// If scheduling is enabled, but handler isn't set up yet. Try again later.
if fs.scheduling == nil {
return
}
if fs.pmsgcb == nil {
fs.scheduling.resetTimer()
return
}
fs.scheduling.running = true
scheduledMsgs := fs.scheduling.getScheduledMessages(func(seq uint64, smv *StoreMsg) *StoreMsg {
sm, _ := fs.msgForSeqLocked(seq, smv, false)
@@ -6088,9 +6176,8 @@ func (fs *fileStore) runMsgScheduling() {
fs.mu.Lock()
}
if fs.scheduling != nil {
fs.scheduling.resetTimer()
}
fs.scheduling.running, fs.scheduling.deadline = false, 0
fs.scheduling.resetTimer()
}
// Lock should be held.
@@ -6219,6 +6306,17 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg
// reference. It will now stay strong until the flusher decides it is time to weaken.
mb.ecache.Strengthen()
// Make sure we have enough space to write into. If we don't then we can pull a buffer
// from the next pool size up to save us from reallocating in append() below.
if nsz := len(mb.cache.buf) + int(rl); cap(mb.cache.buf) < nsz {
prev := mb.cache.buf
mb.cache.buf = getMsgBlockBuf(nsz)
if prev != nil {
mb.cache.buf = mb.cache.buf[:copy(mb.cache.buf[:nsz], prev)]
recycleMsgBlockBuf(prev)
}
}
// Indexing
index := len(mb.cache.buf)
@@ -6269,7 +6367,6 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg
// Update write through cache.
// Write to msg record.
mb.cache.buf = append(mb.cache.buf, checksum...)
mb.cache.lrl = uint32(rl)
// Set cache timestamp for last store.
mb.lwts = ts
@@ -6835,14 +6932,25 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock {
fs.mu.RLock()
defer fs.mu.RUnlock()
t := minTime.UnixNano()
for _, mb := range fs.blks {
// Binary search for first block where last.ts >= t.
i, _ := slices.BinarySearchFunc(fs.blks, minTime.UnixNano(), func(mb *msgBlock, target int64) int {
mb.mu.RLock()
found := t <= mb.last.ts
last := mb.last.ts
mb.mu.RUnlock()
if found {
return mb
switch {
case last < target:
return -1
case last > target:
return 1
default:
return 0
}
})
// BinarySearchFunc returns an insertion point if not found.
// Either way, i is the index of the first mb where mb.last.ts >= t.
if i < len(fs.blks) {
return fs.blks[i]
}
return nil
}
@@ -6967,7 +7075,6 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
}
// Add to our index.
idx = append(idx, index)
mb.cache.lrl = uint32(rl)
// Adjust if we guessed wrong.
if seq != 0 && seq < fseq {
fseq = seq
@@ -7471,7 +7578,14 @@ func (mb *msgBlock) cacheLookupNoCopy(seq uint64, sm *StoreMsg) (*StoreMsg, erro
// Will do a lookup from cache.
// Lock should be held.
func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*StoreMsg, error) {
if seq < atomic.LoadUint64(&mb.first.seq) || seq > atomic.LoadUint64(&mb.last.seq) {
fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq)
switch {
case lseq == fseq-1:
// The block is empty, no messages have been written yet. This works because
// newMsgBlockForWrite sets fseq=fs.State.LastSeq+1 and lseq=fs.State.LastSeq.
return nil, ErrStoreMsgNotFound
case seq < fseq || seq > lseq:
// Sequence is out of range for this block.
return nil, ErrStoreMsgNotFound
}
@@ -7508,7 +7622,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
}
// Check partial cache status.
if seq < mb.cache.fseq {
mb.fs.warn("Cache lookup detected partial cache: seq %d vs cache fseq %d", seq, mb.cache.fseq)
mb.fs.warn("Partial cache: seq %d is less than cache fseq %d", seq, mb.cache.fseq)
return nil, errPartialCache
}
@@ -7522,6 +7636,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
li := int(bi)
if li >= len(mb.cache.buf) {
mb.fs.warn("Partial cache: slot index %d is less than cache buffer len %d", li, len(mb.cache.buf))
return nil, errPartialCache
}
buf := mb.cache.buf[li:]
@@ -7544,8 +7659,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
}
if seq != fsm.seq { // See TestFileStoreInvalidIndexesRebuilt.
recycleMsgBlockBuf(mb.cache.buf)
mb.cache.buf = nil
mb.tryForceExpireCacheLocked()
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, fsm.seq)
}
@@ -7669,7 +7783,8 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) {
var hb [highwayhash.Size64]byte
if !bytes.Equal(hh.Sum(hb[:0]), data[len(data)-8:]) {
return nil, errBadMsg{mb.mfn, "invalid checksum"}
}
}
@@ -9064,7 +9179,7 @@ func (fs *fileStore) Truncate(seq uint64) error {
}
mb.mu.Lock()
}
fs.removeMsgBlock(mb)
fs.forceRemoveMsgBlock(mb)
mb.mu.Unlock()
}
@@ -9086,7 +9201,7 @@ func (fs *fileStore) Truncate(seq uint64) error {
}
smb.mu.Lock()
}
fs.removeMsgBlock(smb)
fs.forceRemoveMsgBlock(smb)
smb.mu.Unlock()
goto SKIP
}
@@ -9128,7 +9243,7 @@ SKIP:
if !hasWrittenTombstones {
fs.lmb = smb
tmb.mu.Lock()
fs.removeMsgBlock(tmb)
fs.forceRemoveMsgBlock(tmb)
tmb.mu.Unlock()
}
@@ -9208,8 +9323,8 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) {
// Both locks should be held.
func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
// Check for us being last message block
lseq, lts := atomic.LoadUint64(&mb.last.seq), mb.last.ts
if mb == fs.lmb {
lseq, lts := atomic.LoadUint64(&mb.last.seq), mb.last.ts
// Creating a new message write block requires that the lmb lock is not held.
mb.mu.Unlock()
// Write the tombstone to remember since this was last block.
@@ -9217,8 +9332,17 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
fs.writeTombstone(lseq, lts)
}
mb.mu.Lock()
} else if lseq == fs.state.LastSeq {
// Need to write a tombstone for the last sequence if we're removing the block containing it.
fs.writeTombstone(lseq, lts)
}
// Only delete message block after (potentially) writing a new lmb.
// Only delete message block after (potentially) writing a tombstone.
fs.forceRemoveMsgBlock(mb)
}
// Removes the msgBlock, without writing tombstones to ensure the last sequence is preserved.
// Both locks should be held.
func (fs *fileStore) forceRemoveMsgBlock(mb *msgBlock) {
mb.dirtyCloseWithRemove(true)
fs.removeMsgBlockFromList(mb)
}
@@ -9975,11 +10099,7 @@ func (fs *fileStore) writeTTLState() error {
buf := fs.ttls.Encode(fs.state.LastSeq + 1)
fs.mu.RUnlock()
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
dios <- struct{}{}
return err
return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms)
}
func (fs *fileStore) writeMsgSchedulingState() error {
@@ -9993,11 +10113,7 @@ func (fs *fileStore) writeMsgSchedulingState() error {
buf := fs.scheduling.encode(fs.state.LastSeq + 1)
fs.mu.RUnlock()
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
dios <- struct{}{}
return err
return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms)
}
// Stop the current filestore.
@@ -11671,29 +11787,47 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {
// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is
// handled automatically by this function, so don't wrap calls to it in dios.
func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error {
if fs.fcfg.SyncAlways {
return writeFileWithSync(name, data, perm)
}
<-dios
defer func() {
dios <- struct{}{}
}()
return os.WriteFile(name, data, perm)
return writeAtomically(name, data, perm, fs.fcfg.SyncAlways)
}
func writeFileWithSync(name string, data []byte, perm fs.FileMode) error {
return writeAtomically(name, data, perm, true)
}
func writeAtomically(name string, data []byte, perm fs.FileMode, sync bool) error {
tmp := name + ".tmp"
flags := os.O_CREATE | os.O_WRONLY | os.O_TRUNC
if sync {
flags = flags | os.O_SYNC
}
<-dios
defer func() {
dios <- struct{}{}
}()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_SYNC
f, err := os.OpenFile(name, flags, perm)
f, err := os.OpenFile(tmp, flags, perm)
if err != nil {
return err
}
if _, err = f.Write(data); err != nil {
if _, err := f.Write(data); err != nil {
_ = f.Close()
_ = os.Remove(tmp)
return err
}
return f.Close()
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return err
}
if err := os.Rename(tmp, name); err != nil {
_ = os.Remove(tmp)
return err
}
if sync {
// To ensure that the file rename was persisted on all filesystems,
// also try to flush the directory metadata.
if d, err := os.Open(filepath.Dir(name)); err == nil {
_ = d.Sync()
_ = d.Close()
}
}
return nil
}

View File

@@ -408,6 +408,9 @@ func (n *node[T]) isEmpty() bool {
// Return the number of nodes for the given level.
func (l *level[T]) numNodes() int {
if l == nil {
return 0
}
num := len(l.nodes)
if l.pwc != nil {
num++
@@ -489,39 +492,49 @@ func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T
if len(nsubj) > 0 {
nsubj = append(subj, '.')
}
switch {
case r.fwc != nil:
if r.fwc != nil {
// We've reached a full wildcard, do a FWC match on the stree at this point
// and don't keep iterating downward.
nsubj := append(nsubj, '>')
st.Match(nsubj, cb)
case r.pwc != nil:
return
}
if r.pwc != nil {
// We've found a partial wildcard. We'll keep iterating downwards, but first
// check whether there's interest at this level (without triggering dupes) and
// match if so.
var done bool
nsubj := append(nsubj, '*')
if len(r.pwc.subs) > 0 {
st.Match(nsubj, cb)
done = true
}
if r.pwc.next != nil && r.pwc.next.numNodes() > 0 {
if r.pwc.next.numNodes() > 0 {
intersectStree(st, r.pwc.next, nsubj, cb)
}
default:
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
nsubj := append(nsubj, t...)
if len(n.subs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
if done {
return
}
}
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
// A wildcard at the next level will already visit these descendents
// so skip so we don't callback the same subject more than once.
continue
}
nsubj := append(nsubj, t...)
if len(n.subs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
}
if n.next != nil && n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
if n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
}

View File

@@ -1348,8 +1348,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
var offlineReason string
if !supported {
apiLevel := getRequiredApiLevel(cfg.Metadata)
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
s.Warnf(" Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", a.Name, cfg.StreamConfig.Name, apiLevel)
if strictErr != nil {
offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: "))
} else {
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
}
s.Warnf(" Detected unsupported stream '%s > %s': %s", a.Name, cfg.StreamConfig.Name, offlineReason)
} else {
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, strictErr)
@@ -1571,8 +1575,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
var offlineReason string
if !supported {
apiLevel := getRequiredApiLevel(cfg.Metadata)
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
s.Warnf(" Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", a.Name, e.mset.name(), cfg.Name, apiLevel)
if strictErr != nil {
offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: "))
} else {
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
}
s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, e.mset.name(), cfg.Name, offlineReason)
} else {
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr)
@@ -1582,7 +1590,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
if !e.mset.closed.Load() {
s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name())
e.mset.mu.Lock()
e.mset.offlineReason = "stopped"
e.mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name)
e.mset.mu.Unlock()
e.mset.stop(false, false)
}

View File

@@ -2655,15 +2655,16 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac
}
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
if js == nil || cc == nil {
return
}
// Extra checks here but only leader is listening.
js.mu.RLock()
isLeader := cc.isLeader()
meta := cc.meta
js.mu.RUnlock()
// Extra checks here but only leader is listening.
if !isLeader {
return
}
@@ -2690,7 +2691,7 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac
var found string
js.mu.RLock()
for _, p := range cc.meta.Peers() {
for _, p := range meta.Peers() {
// If Peer is specified, it takes precedence
if req.Peer != _EMPTY_ {
if p.ID == req.Peer {
@@ -2715,7 +2716,7 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac
// So we have a valid peer.
js.mu.Lock()
cc.meta.ProposeRemovePeer(found)
meta.ProposeRemovePeer(found)
js.mu.Unlock()
resp.Success = true
@@ -2766,7 +2767,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _
}
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
if js == nil || cc == nil {
return
}
@@ -2930,7 +2931,7 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli
}
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
if js == nil || cc == nil {
return
}
@@ -3090,7 +3091,13 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
}
_, cc := s.getJetStreamCluster()
if cc == nil || cc.meta == nil || !cc.isLeader() {
js.mu.RLock()
isLeader := cc.isLeader()
meta := cc.meta
js.mu.RUnlock()
if !isLeader {
return
}
@@ -3108,11 +3115,11 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
for _, oca := range osa.consumers {
oca.deleted = true
ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client}
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
meta.Propose(encodeDeleteConsumerAssignment(ca))
nc++
}
sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client}
cc.meta.Propose(encodeDeleteStreamAssignment(sa))
meta.Propose(encodeDeleteStreamAssignment(sa))
ns++
}
js.mu.RUnlock()
@@ -3143,13 +3150,14 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
}
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
if js == nil || cc == nil {
return
}
// Extra checks here but only leader is listening.
js.mu.RLock()
isLeader := cc.isLeader()
meta := cc.meta
js.mu.RUnlock()
if !isLeader {
@@ -3171,14 +3179,14 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(cc.meta, req.Placement); resp.Error != nil {
if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(meta, req.Placement); resp.Error != nil {
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}
// Call actual stepdown.
err = cc.meta.StepDown(preferredLeader)
err = meta.StepDown(preferredLeader)
if err != nil {
resp.Error = NewJSRaftGeneralError(err, Unless(err))
} else {
@@ -3931,6 +3939,8 @@ func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError
if apiErr != nil {
return apiErr
}
jsa.js.mu.RLock()
defer jsa.js.mu.RUnlock()
jsa.mu.RLock()
defer jsa.mu.RUnlock()
if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams {
@@ -4971,6 +4981,10 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
meta := cc.meta
js.mu.RUnlock()
if meta == nil {
return
}
// Since these could wait on the Raft group lock, don't do so under the JS lock.
ourID := meta.ID()
groupLeaderless := meta.Leaderless()
@@ -5297,6 +5311,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
nca := *ca
ncfg := *ca.Config
nca.Config = &ncfg
meta := cc.meta
js.mu.RUnlock()
pauseUTC := req.PauseUntil.UTC()
if !pauseUTC.IsZero() {
@@ -5310,7 +5325,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
setStaticConsumerMetadata(nca.Config)
eca := encodeAddConsumerAssignment(&nca)
cc.meta.Propose(eca)
meta.Propose(eca)
resp.PauseUntil = pauseUTC
if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {

View File

@@ -303,7 +303,7 @@ func checkMsgHeadersPreClusteredProposal(
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
// Dedupe if staged.
if _, ok = diff.msgIds[msgId]; ok {
return hdr, msg, 0, nil, errMsgIdDuplicate
return hdr, msg, 0, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate
}
mset.ddMu.Lock()
if dde := mset.checkMsgId(msgId); dde != nil {
@@ -311,7 +311,7 @@ func checkMsgHeadersPreClusteredProposal(
mset.ddMu.Unlock()
// Should not return an invalid sequence, in that case error.
if seq > 0 {
return hdr, msg, seq, nil, errMsgIdDuplicate
return hdr, msg, seq, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate
} else {
return hdr, msg, 0, NewJSStreamDuplicateMessageConflictError(), errMsgIdDuplicate
}

View File

@@ -21,7 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"math"
"math/rand"
"os"
@@ -33,6 +32,7 @@ import (
"sync/atomic"
"time"
"github.com/antithesishq/antithesis-sdk-go/assert"
"github.com/klauspost/compress/s2"
"github.com/minio/highwayhash"
"github.com/nats-io/nuid"
@@ -136,14 +136,15 @@ type raftGroup struct {
// streamAssignment is what the meta controller uses to assign streams to peers.
type streamAssignment struct {
Client *ClientInfo `json:"client,omitempty"`
Created time.Time `json:"created"`
Config *StreamConfig `json:"stream"`
Group *raftGroup `json:"group"`
Sync string `json:"sync"`
Subject string `json:"subject,omitempty"`
Reply string `json:"reply,omitempty"`
Restore *StreamState `json:"restore_state,omitempty"`
Client *ClientInfo `json:"client,omitempty"`
Created time.Time `json:"created"`
ConfigJSON json.RawMessage `json:"stream"`
Config *StreamConfig `json:"-"`
Group *raftGroup `json:"group"`
Sync string `json:"sync"`
Subject string `json:"subject,omitempty"`
Reply string `json:"reply,omitempty"`
Restore *StreamState `json:"restore_state,omitempty"`
// Internal
consumers map[string]*consumerAssignment
responded bool
@@ -155,22 +156,26 @@ type streamAssignment struct {
}
type unsupportedStreamAssignment struct {
json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level.
reason string
info StreamInfo
sysc *client
infoSub *subscription
}
func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte) *unsupportedStreamAssignment {
func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, err error) *unsupportedStreamAssignment {
reason := "stopped"
if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) {
if err != nil {
if errstr := err.Error(); strings.HasPrefix(errstr, "json:") {
reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: "))
} else {
reason = fmt.Sprintf("stopped - %s", errstr)
}
} else if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) {
if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ {
reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel)
}
}
return &unsupportedStreamAssignment{
json: json,
reason: reason,
info: StreamInfo{
Created: sa.Created,
@@ -215,15 +220,16 @@ func (usa *unsupportedStreamAssignment) closeInfoSub(s *Server) {
// consumerAssignment is what the meta controller uses to assign consumers to streams.
type consumerAssignment struct {
Client *ClientInfo `json:"client,omitempty"`
Created time.Time `json:"created"`
Name string `json:"name"`
Stream string `json:"stream"`
Config *ConsumerConfig `json:"consumer"`
Group *raftGroup `json:"group"`
Subject string `json:"subject,omitempty"`
Reply string `json:"reply,omitempty"`
State *ConsumerState `json:"state,omitempty"`
Client *ClientInfo `json:"client,omitempty"`
Created time.Time `json:"created"`
Name string `json:"name"`
Stream string `json:"stream"`
ConfigJSON json.RawMessage `json:"consumer"`
Config *ConsumerConfig `json:"-"`
Group *raftGroup `json:"group"`
Subject string `json:"subject,omitempty"`
Reply string `json:"reply,omitempty"`
State *ConsumerState `json:"state,omitempty"`
// Internal
responded bool
recovering bool
@@ -234,22 +240,26 @@ type consumerAssignment struct {
}
type unsupportedConsumerAssignment struct {
json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level.
reason string
info ConsumerInfo
sysc *client
infoSub *subscription
}
func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsupportedConsumerAssignment {
func newUnsupportedConsumerAssignment(ca *consumerAssignment, err error) *unsupportedConsumerAssignment {
reason := "stopped"
if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) {
if err != nil {
if errstr := err.Error(); strings.HasPrefix(errstr, "json:") {
reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: "))
} else {
reason = fmt.Sprintf("stopped - %s", errstr)
}
} else if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) {
if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ {
reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel)
}
}
return &unsupportedConsumerAssignment{
json: json,
reason: reason,
info: ConsumerInfo{
Stream: ca.Stream,
@@ -294,35 +304,13 @@ func (uca *unsupportedConsumerAssignment) closeInfoSub(s *Server) {
}
type writeableConsumerAssignment struct {
consumerAssignment
// Internal
unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level.
}
func (wca *writeableConsumerAssignment) MarshalJSON() ([]byte, error) {
if wca.unsupportedJson != nil {
return wca.unsupportedJson, nil
}
return json.Marshal(wca.consumerAssignment)
}
func (wca *writeableConsumerAssignment) UnmarshalJSON(data []byte) error {
var unsupported bool
var ca consumerAssignment
decoder := json.NewDecoder(bytes.NewReader(data))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&ca); err != nil {
unsupported = true
ca = consumerAssignment{}
if err = json.Unmarshal(data, &ca); err != nil {
return err
}
}
wca.consumerAssignment = ca
if unsupported || (wca.Config != nil && !supportsRequiredApiLevel(wca.Config.Metadata)) {
wca.unsupportedJson = data
}
return nil
Client *ClientInfo `json:"client,omitempty"`
Created time.Time `json:"created"`
Name string `json:"name"`
Stream string `json:"stream"`
ConfigJSON json.RawMessage `json:"consumer"`
Group *raftGroup `json:"group"`
State *ConsumerState `json:"state,omitempty"`
}
// streamPurge is what the stream leader will replicate when purging a stream.
@@ -1437,13 +1425,13 @@ func (js *jetStream) monitorCluster() {
go checkHealth()
continue
}
if didSnap, didStreamRemoval, _, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
if didSnap, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
var nb uint64
// Some entries can fail without an error when shutting down, don't move applied forward.
if !js.isShuttingDown() {
_, nb = n.Applied(ce.Index)
}
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
if js.hasPeerEntries(ce.Entries) || (didSnap && !isLeader) {
doSnapshot()
} else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta {
doSnapshot()
@@ -1542,44 +1530,12 @@ func (js *jetStream) checkClusterSize() {
// Represents our stable meta state that we can write out.
type writeableStreamAssignment struct {
backingStreamAssignment
// Internal
unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level.
}
type backingStreamAssignment struct {
Client *ClientInfo `json:"client,omitempty"`
Created time.Time `json:"created"`
Config *StreamConfig `json:"stream"`
Group *raftGroup `json:"group"`
Sync string `json:"sync"`
Consumers []*writeableConsumerAssignment
}
func (wsa *writeableStreamAssignment) MarshalJSON() ([]byte, error) {
if wsa.unsupportedJson != nil {
return wsa.unsupportedJson, nil
}
return json.Marshal(wsa.backingStreamAssignment)
}
func (wsa *writeableStreamAssignment) UnmarshalJSON(data []byte) error {
var unsupported bool
var bsa backingStreamAssignment
decoder := json.NewDecoder(bytes.NewReader(data))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&bsa); err != nil {
unsupported = true
bsa = backingStreamAssignment{}
if err = json.Unmarshal(data, &bsa); err != nil {
return err
}
}
wsa.backingStreamAssignment = bsa
if unsupported || (wsa.Config != nil && !supportsRequiredApiLevel(wsa.Config.Metadata)) {
wsa.unsupportedJson = data
}
return nil
Client *ClientInfo `json:"client,omitempty"`
Created time.Time `json:"created"`
ConfigJSON json.RawMessage `json:"stream"`
Group *raftGroup `json:"group"`
Sync string `json:"sync"`
Consumers []*writeableConsumerAssignment
}
func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConfig, bool) {
@@ -1604,19 +1560,13 @@ func (js *jetStream) metaSnapshot() ([]byte, error) {
streams := make([]writeableStreamAssignment, 0, nsa)
for _, asa := range cc.streams {
for _, sa := range asa {
if sa.unsupported != nil && sa.unsupported.json != nil {
streams = append(streams, writeableStreamAssignment{unsupportedJson: sa.unsupported.json})
continue
}
wsa := writeableStreamAssignment{
backingStreamAssignment: backingStreamAssignment{
Client: sa.Client.forAssignmentSnap(),
Created: sa.Created,
Config: sa.Config,
Group: sa.Group,
Sync: sa.Sync,
Consumers: make([]*writeableConsumerAssignment, 0, len(sa.consumers)),
},
Client: sa.Client.forAssignmentSnap(),
Created: sa.Created,
ConfigJSON: sa.ConfigJSON,
Group: sa.Group,
Sync: sa.Sync,
Consumers: make([]*writeableConsumerAssignment, 0, len(sa.consumers)),
}
for _, ca := range sa.consumers {
// Skip if the consumer is pending, we can't include it in our snapshot.
@@ -1624,16 +1574,16 @@ func (js *jetStream) metaSnapshot() ([]byte, error) {
if ca.pending {
continue
}
if ca.unsupported != nil && ca.unsupported.json != nil {
wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{unsupportedJson: ca.unsupported.json})
nca++
continue
wca := writeableConsumerAssignment{
Client: ca.Client.forAssignmentSnap(),
Created: ca.Created,
Name: ca.Name,
Stream: ca.Stream,
ConfigJSON: ca.ConfigJSON,
Group: ca.Group,
State: ca.State,
}
cca := *ca
cca.Stream = wsa.Config.Name // Needed for safe roll-backs.
cca.Client = cca.Client.forAssignmentSnap()
cca.Subject, cca.Reply = _EMPTY_, _EMPTY_
wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{consumerAssignment: cca})
wsa.Consumers = append(wsa.Consumers, &wca)
nca++
}
streams = append(streams, wsa)
@@ -1685,30 +1635,25 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
// Build our new version here outside of js.
streams := make(map[string]map[string]*streamAssignment)
for _, wsa := range wsas {
fixCfgMirrorWithDedupWindow(wsa.Config)
as := streams[wsa.Client.serviceAccount()]
if as == nil {
as = make(map[string]*streamAssignment)
streams[wsa.Client.serviceAccount()] = as
}
sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, Config: wsa.Config, Group: wsa.Group, Sync: wsa.Sync}
if wsa.unsupportedJson != nil {
sa.unsupported = newUnsupportedStreamAssignment(js.srv, sa, wsa.unsupportedJson)
}
sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, ConfigJSON: wsa.ConfigJSON, Group: wsa.Group, Sync: wsa.Sync}
decodeStreamAssignmentConfig(js.srv, sa)
if len(wsa.Consumers) > 0 {
sa.consumers = make(map[string]*consumerAssignment)
for _, wca := range wsa.Consumers {
if wca.Stream == _EMPTY_ {
wca.Stream = sa.Config.Name // Rehydrate from the stream name.
}
ca := &consumerAssignment{Client: wca.Client, Created: wca.Created, Name: wca.Name, Stream: wca.Stream, Config: wca.Config, Group: wca.Group, Subject: wca.Subject, Reply: wca.Reply, State: wca.State}
if wca.unsupportedJson != nil {
ca.unsupported = newUnsupportedConsumerAssignment(ca, wca.unsupportedJson)
}
ca := &consumerAssignment{Client: wca.Client, Created: wca.Created, Name: wca.Name, Stream: wca.Stream, ConfigJSON: wca.ConfigJSON, Group: wca.Group, State: wca.State}
decodeConsumerAssignmentConfig(ca)
sa.consumers[ca.Name] = ca
}
}
as[wsa.Config.Name] = sa
as[sa.Config.Name] = sa
}
js.mu.Lock()
@@ -2065,8 +2010,8 @@ func (ca *consumerAssignment) recoveryKey() string {
return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name
}
func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) {
var didSnap, didRemoveStream, didRemoveConsumer bool
func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, error) {
var didSnap bool
isRecovering := js.isMetaRecovering()
for _, e := range entries {
@@ -2088,7 +2033,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
sa, err := decodeStreamAssignment(js.srv, buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
@@ -2102,7 +2047,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
sa, err := decodeStreamAssignment(js.srv, buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
@@ -2114,13 +2059,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.removeConsumers, key)
} else {
js.processStreamRemoval(sa)
didRemoveStream = true
}
case assignConsumerOp:
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
@@ -2140,7 +2084,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
@@ -2160,7 +2104,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
@@ -2175,13 +2119,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
} else {
js.processConsumerRemoval(ca)
didRemoveConsumer = true
}
case updateStreamOp:
sa, err := decodeStreamAssignment(js.srv, buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
@@ -2191,16 +2134,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.removeStreams, key)
} else {
js.processUpdateStreamAssignment(sa)
// Since an update can be lowering replica count, we want upper layer to treat
// similar to a removal and snapshot to collapse old entries.
didRemoveStream = true
}
default:
panic(fmt.Sprintf("JetStream Cluster Unknown meta entry op type: %v", entryOp(buf[0])))
}
}
}
return didSnap, didRemoveStream, didRemoveConsumer, nil
return didSnap, nil
}
func (rg *raftGroup) isMember(id string) bool {
@@ -2710,7 +2650,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
batch := mset.batchApply
mset.mu.RUnlock()
if batch != nil {
mset.srv.Debugf("[batch] reject %s - empty entry", batch.id)
batch.rejectBatchState(mset)
}
}
@@ -2746,6 +2685,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
mset.retryMirrorConsumer()
continue
}
// If the error signals we timed out of a snapshot, we should try to replay the snapshot
// instead of fully resetting the state. Resetting the clustered state may result in
// race conditions and should only be used as a last effort attempt.
if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries {
if node := mset.raftNode(); node != nil && node.DrainAndReplaySnapshot() {
break
}
}
// We will attempt to reset our cluster state.
if mset.resetClusteredState(err) {
aq.recycle(&ces)
@@ -3096,10 +3043,16 @@ func (mset *stream) isMigrating() bool {
// resetClusteredState is called when a clustered stream had an error (e.g sequence mismatch, bad snapshot) and needs to be reset.
func (mset *stream) resetClusteredState(err error) bool {
mset.mu.RLock()
s, js, jsa, sa, acc, node := mset.srv, mset.js, mset.jsa, mset.sa, mset.acc, mset.node
s, js, jsa, sa, acc, node, name := mset.srv, mset.js, mset.jsa, mset.sa, mset.acc, mset.node, mset.nameLocked(false)
stype, tierName, replicas := mset.cfg.Storage, mset.tier, mset.cfg.Replicas
mset.mu.RUnlock()
assert.Unreachable("Reset clustered state", map[string]any{
"stream": name,
"account": acc.Name,
"err": err,
})
// The stream might already be deleted and not assigned to us anymore.
// In any case, don't revive the stream if it's already closed.
if mset.closed.Load() {
@@ -3408,7 +3361,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
panic(err.Error())
}
s, cc := js.server(), js.cluster
s := js.server()
var removed bool
if md.NoErase {
@@ -3417,9 +3370,18 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
removed, err = mset.eraseMsg(md.Seq)
}
// Cluster reset error.
var isLeader bool
if node := mset.raftNode(); node != nil && node.Leader() {
isLeader = true
}
if err == ErrStoreEOF {
return 0, err
if isLeader && !isRecovering {
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
s.sendAPIErrResponse(md.Client, mset.account(), md.Subject, md.Reply, _EMPTY_, s.jsonResponse(resp))
}
continue
}
if err != nil && !isRecovering {
@@ -3427,10 +3389,6 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
md.Seq, md.Client.serviceAccount(), md.Stream, err)
}
js.mu.RLock()
isLeader := cc.isStreamLeader(md.Client.serviceAccount(), md.Stream)
js.mu.RUnlock()
if isLeader && !isRecovering {
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
if err != nil {
@@ -3601,7 +3559,7 @@ func (mset *stream) skipBatchIfRecovering(batch *batchApply, buf []byte) (bool,
// We can skip if we know this is less than what we already have.
if lseq-clfs < last {
mset.srv.Debugf("[batch] Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d",
mset.srv.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d",
mset.accountLocked(false), mset.nameLocked(false), lseq+1-clfs, last)
// Check for any preAcks in case we are interest based.
mset.clearAllPreAcks(lseq + 1 - clfs)
@@ -3684,7 +3642,7 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
// Messages to be skipped have no subject or timestamp or msg or hdr.
if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 {
// Skip and update our lseq.
last := mset.store.SkipMsg()
last, _ := mset.store.SkipMsg(0)
if needLock {
mset.mu.Lock()
}
@@ -4057,8 +4015,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
// If unsupported, we can't register any further.
if sa.unsupported != nil {
sa.unsupported.setupInfoSub(s, sa)
apiLevel := getRequiredApiLevel(sa.Config.Metadata)
s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel)
s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason)
js.mu.Unlock()
// Need to stop the stream, we can't keep running with an old config.
@@ -4187,8 +4144,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
// If unsupported, we can't register any further.
if sa.unsupported != nil {
sa.unsupported.setupInfoSub(s, sa)
apiLevel := getRequiredApiLevel(sa.Config.Metadata)
s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel)
s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason)
js.mu.Unlock()
// Need to stop the stream, we can't keep running with an old config.
@@ -4869,12 +4825,11 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// If unsupported, we can't register any further.
if ca.unsupported != nil {
ca.unsupported.setupInfoSub(s, ca)
apiLevel := getRequiredApiLevel(ca.Config.Metadata)
s.Warnf("Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", accName, stream, ca.Name, apiLevel)
s.Warnf("Detected unsupported consumer '%s > %s > %s': %s", accName, stream, ca.Name, ca.unsupported.reason)
// Mark stream as unsupported as well
if sa.unsupported == nil {
sa.unsupported = newUnsupportedStreamAssignment(s, sa, nil)
sa.unsupported = newUnsupportedStreamAssignment(s, sa, fmt.Errorf("unsupported consumer %q", ca.Name))
}
sa.unsupported.setupInfoSub(s, sa)
js.mu.Unlock()
@@ -8021,6 +7976,7 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset
func encodeAddStreamAssignment(sa *streamAssignment) []byte {
csa := *sa
csa.Client = csa.Client.forProposal()
csa.ConfigJSON, _ = json.Marshal(sa.Config)
var bb bytes.Buffer
bb.WriteByte(byte(assignStreamOp))
json.NewEncoder(&bb).Encode(csa)
@@ -8030,6 +7986,7 @@ func encodeAddStreamAssignment(sa *streamAssignment) []byte {
func encodeUpdateStreamAssignment(sa *streamAssignment) []byte {
csa := *sa
csa.Client = csa.Client.forProposal()
csa.ConfigJSON, _ = json.Marshal(sa.Config)
var bb bytes.Buffer
bb.WriteByte(byte(updateStreamOp))
json.NewEncoder(&bb).Encode(csa)
@@ -8039,6 +7996,7 @@ func encodeUpdateStreamAssignment(sa *streamAssignment) []byte {
func encodeDeleteStreamAssignment(sa *streamAssignment) []byte {
csa := *sa
csa.Client = csa.Client.forProposal()
csa.ConfigJSON, _ = json.Marshal(sa.Config)
var bb bytes.Buffer
bb.WriteByte(byte(removeStreamOp))
json.NewEncoder(&bb).Encode(csa)
@@ -8046,25 +8004,38 @@ func encodeDeleteStreamAssignment(sa *streamAssignment) []byte {
}
func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) {
var unsupported bool
var sa streamAssignment
decoder := json.NewDecoder(bytes.NewReader(buf))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&sa); err != nil {
unsupported = true
sa = streamAssignment{}
if err = json.Unmarshal(buf, &sa); err != nil {
return nil, err
}
if err := json.Unmarshal(buf, &sa); err != nil {
return nil, err
}
fixCfgMirrorWithDedupWindow(sa.Config)
if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) {
sa.unsupported = newUnsupportedStreamAssignment(s, &sa, copyBytes(buf))
if err := decodeStreamAssignmentConfig(s, &sa); err != nil {
return nil, err
}
return &sa, nil
}
func decodeStreamAssignmentConfig(s *Server, sa *streamAssignment) error {
var unsupported bool
var cfg StreamConfig
var err error
decoder := json.NewDecoder(bytes.NewReader(sa.ConfigJSON))
decoder.DisallowUnknownFields()
if err = decoder.Decode(&cfg); err != nil {
unsupported = true
cfg = StreamConfig{}
if err2 := json.Unmarshal(sa.ConfigJSON, &cfg); err2 != nil {
return err2
}
}
sa.Config = &cfg
fixCfgMirrorWithDedupWindow(sa.Config)
if unsupported || err != nil || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) {
sa.unsupported = newUnsupportedStreamAssignment(s, sa, err)
}
return nil
}
func encodeDeleteRange(dr *DeleteRange) []byte {
var bb bytes.Buffer
bb.WriteByte(byte(deleteRangeOp))
@@ -8480,6 +8451,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
func encodeAddConsumerAssignment(ca *consumerAssignment) []byte {
cca := *ca
cca.Client = cca.Client.forProposal()
cca.ConfigJSON, _ = json.Marshal(ca.Config)
var bb bytes.Buffer
bb.WriteByte(byte(assignConsumerOp))
json.NewEncoder(&bb).Encode(cca)
@@ -8489,6 +8461,7 @@ func encodeAddConsumerAssignment(ca *consumerAssignment) []byte {
func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte {
cca := *ca
cca.Client = cca.Client.forProposal()
cca.ConfigJSON, _ = json.Marshal(ca.Config)
var bb bytes.Buffer
bb.WriteByte(byte(removeConsumerOp))
json.NewEncoder(&bb).Encode(cca)
@@ -8496,27 +8469,40 @@ func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte {
}
func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) {
var unsupported bool
var ca consumerAssignment
decoder := json.NewDecoder(bytes.NewReader(buf))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&ca); err != nil {
unsupported = true
ca = consumerAssignment{}
if err = json.Unmarshal(buf, &ca); err != nil {
return nil, err
}
if err := json.Unmarshal(buf, &ca); err != nil {
return nil, err
}
if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) {
ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(buf))
if err := decodeConsumerAssignmentConfig(&ca); err != nil {
return nil, err
}
return &ca, nil
}
func decodeConsumerAssignmentConfig(ca *consumerAssignment) error {
var unsupported bool
var cfg ConsumerConfig
var err error
decoder := json.NewDecoder(bytes.NewReader(ca.ConfigJSON))
decoder.DisallowUnknownFields()
if err = decoder.Decode(&cfg); err != nil {
unsupported = true
cfg = ConsumerConfig{}
if err2 := json.Unmarshal(ca.ConfigJSON, &cfg); err2 != nil {
return err2
}
}
ca.Config = &cfg
if unsupported || err != nil || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) {
ca.unsupported = newUnsupportedConsumerAssignment(ca, err)
}
return nil
}
func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte {
cca := *ca
cca.Client = cca.Client.forProposal()
cca.ConfigJSON, _ = json.Marshal(ca.Config)
var bb bytes.Buffer
bb.WriteByte(byte(assignCompressedConsumerOp))
s2e := s2.NewWriter(&bb)
@@ -8526,32 +8512,16 @@ func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte {
}
func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) {
var unsupported bool
var ca consumerAssignment
bb := bytes.NewBuffer(buf)
s2d := s2.NewReader(bb)
decoder := json.NewDecoder(s2d)
decoder.DisallowUnknownFields()
if err := decoder.Decode(&ca); err != nil {
unsupported = true
ca = consumerAssignment{}
bb = bytes.NewBuffer(buf)
s2d = s2.NewReader(bb)
if err = json.NewDecoder(s2d).Decode(&ca); err != nil {
return nil, err
}
return nil, err
}
if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) {
bb = bytes.NewBuffer(buf)
s2d = s2.NewReader(bb)
dec, err := io.ReadAll(s2d)
if err != nil {
return nil, err
}
ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(dec))
if err := decodeConsumerAssignmentConfig(&ca); err != nil {
return nil, err
}
return &ca, nil
}
@@ -8926,7 +8896,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
)
diff := &batchStagedDiff{}
if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
// TODO(mvv): reset in-memory expected header maps
mset.clMu.Unlock()
if err == errMsgIdDuplicate && dseq > 0 {
var buf [256]byte
@@ -8957,12 +8926,10 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
// Do proposal.
err = node.Propose(esm)
// TODO(mvv): reset in-memory expected header maps, if err!=nil
if err == nil {
mset.clseq++
mset.trackReplicationTraffic(node, len(esm), r)
}
_ = node.Propose(esm)
// The proposal can fail, but we always account for trying.
mset.clseq++
mset.trackReplicationTraffic(node, len(esm), r)
// Check to see if we are being overrun.
// TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.
@@ -9176,6 +9143,14 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) (
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
mset.mu.Unlock()
// Always try to resume applies, we might be paused already if we timed out of processing the snapshot previously.
defer func() {
// Don't bother resuming if server or stream is gone.
if e != errCatchupStreamStopped && e != ErrServerNotRunning {
n.ResumeApply()
}
}()
// Bug that would cause this to be empty on stream update.
if subject == _EMPTY_ {
return errCatchupCorruptSnapshot
@@ -9191,13 +9166,6 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) (
return err
}
defer func() {
// Don't bother resuming if server or stream is gone.
if e != errCatchupStreamStopped && e != ErrServerNotRunning {
n.ResumeApply()
}
}()
// Set our catchup state.
mset.setCatchingUp()
defer mset.clearCatchingUp()
@@ -9521,7 +9489,7 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
// Messages to be skipped have no subject or timestamp.
// TODO(dlc) - formalize with skipMsgOp
if subj == _EMPTY_ && ts == 0 {
if lseq := mset.store.SkipMsg(); lseq != seq {
if _, err = mset.store.SkipMsg(seq); err != nil {
return 0, errCatchupWrongSeqForSkip
}
} else if err := mset.store.StoreRawMsg(subj, hdr, msg, seq, ts, ttl); err != nil {

View File

@@ -8,12 +8,18 @@ const (
// JSAccountResourcesExceededErr resource limits exceeded for account
JSAccountResourcesExceededErr ErrorIdentifier = 10002
// JSAtomicPublishContainsDuplicateMessageErr atomic publish batch contains duplicate message id
JSAtomicPublishContainsDuplicateMessageErr ErrorIdentifier = 10201
// JSAtomicPublishDisabledErr atomic publish is disabled
JSAtomicPublishDisabledErr ErrorIdentifier = 10174
// JSAtomicPublishIncompleteBatchErr atomic publish batch is incomplete
JSAtomicPublishIncompleteBatchErr ErrorIdentifier = 10176
// JSAtomicPublishInvalidBatchCommitErr atomic publish batch commit is invalid
JSAtomicPublishInvalidBatchCommitErr ErrorIdentifier = 10200
// JSAtomicPublishInvalidBatchIDErr atomic publish batch ID is invalid
JSAtomicPublishInvalidBatchIDErr ErrorIdentifier = 10179
@@ -603,8 +609,10 @@ const (
var (
ApiErrors = map[ErrorIdentifier]*ApiError{
JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"},
JSAtomicPublishContainsDuplicateMessageErr: {Code: 400, ErrCode: 10201, Description: "atomic publish batch contains duplicate message id"},
JSAtomicPublishDisabledErr: {Code: 400, ErrCode: 10174, Description: "atomic publish is disabled"},
JSAtomicPublishIncompleteBatchErr: {Code: 400, ErrCode: 10176, Description: "atomic publish batch is incomplete"},
JSAtomicPublishInvalidBatchCommitErr: {Code: 400, ErrCode: 10200, Description: "atomic publish batch commit is invalid"},
JSAtomicPublishInvalidBatchIDErr: {Code: 400, ErrCode: 10179, Description: "atomic publish batch ID is invalid"},
JSAtomicPublishMissingSeqErr: {Code: 400, ErrCode: 10175, Description: "atomic publish sequence is missing"},
JSAtomicPublishTooLargeBatchErrF: {Code: 400, ErrCode: 10199, Description: "atomic publish batch is too large: {size}"},
@@ -835,6 +843,16 @@ func NewJSAccountResourcesExceededError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSAccountResourcesExceededErr]
}
// NewJSAtomicPublishContainsDuplicateMessageError creates a new JSAtomicPublishContainsDuplicateMessageErr error: "atomic publish batch contains duplicate message id"
func NewJSAtomicPublishContainsDuplicateMessageError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSAtomicPublishContainsDuplicateMessageErr]
}
// NewJSAtomicPublishDisabledError creates a new JSAtomicPublishDisabledErr error: "atomic publish is disabled"
func NewJSAtomicPublishDisabledError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
@@ -855,6 +873,16 @@ func NewJSAtomicPublishIncompleteBatchError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSAtomicPublishIncompleteBatchErr]
}
// NewJSAtomicPublishInvalidBatchCommitError creates a new JSAtomicPublishInvalidBatchCommitErr error: "atomic publish batch commit is invalid"
func NewJSAtomicPublishInvalidBatchCommitError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSAtomicPublishInvalidBatchCommitErr]
}
// NewJSAtomicPublishInvalidBatchIDError creates a new JSAtomicPublishInvalidBatchIDErr error: "atomic publish batch ID is invalid"
func NewJSAtomicPublishInvalidBatchIDError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)

View File

@@ -76,7 +76,7 @@ type leaf struct {
isSpoke bool
// remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
remoteCluster string
// remoteServer holds onto the remove server's name or ID.
// remoteServer holds onto the remote server's name or ID.
remoteServer string
// domain name of remote server
remoteDomain string
@@ -285,6 +285,11 @@ func validateLeafNode(o *Options) error {
// If a remote has a websocket scheme, all need to have it.
for _, rcfg := range o.LeafNode.Remotes {
// Validate proxy configuration
if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
return err
}
if len(rcfg.URLs) >= 2 {
firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
for i := 1; i < len(rcfg.URLs); i++ {
@@ -369,6 +374,60 @@ func validateLeafNodeAuthOptions(o *Options) error {
return nil
}
func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
var warnings []string
if remote.Proxy.URL == _EMPTY_ {
return warnings, nil
}
proxyURL, err := url.Parse(remote.Proxy.URL)
if err != nil {
return warnings, fmt.Errorf("invalid proxy URL: %v", err)
}
if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
}
if proxyURL.Host == _EMPTY_ {
return warnings, fmt.Errorf("proxy URL must specify a host")
}
if remote.Proxy.Timeout < 0 {
return warnings, fmt.Errorf("proxy timeout must be >= 0")
}
if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
}
if len(remote.URLs) > 0 {
hasWebSocketURL := false
hasNonWebSocketURL := false
for _, remoteURL := range remote.URLs {
if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
hasWebSocketURL = true
if (remoteURL.Scheme == wsSchemePrefixTLS) &&
remote.TLSConfig == nil && !remote.TLS {
return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
}
} else {
hasNonWebSocketURL = true
}
}
if !hasWebSocketURL {
warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
} else if hasNonWebSocketURL {
warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
}
}
return warnings, nil
}
// Update remote LeafNode TLS configurations after a config reload.
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
max := len(opts.LeafNode.Remotes)
@@ -502,6 +561,67 @@ func (s *Server) setLeafNodeNonExportedOptions() {
const sharedSysAccDelay = 250 * time.Millisecond
// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
proxyAddr, err := url.Parse(proxyURL)
if err != nil {
// This should not happen since proxy URL is validated during configuration parsing
return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
}
// Connect to the proxy server
conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
if err != nil {
return nil, fmt.Errorf("failed to connect to proxy: %v", err)
}
// Set deadline for the entire proxy handshake
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
conn.Close()
return nil, fmt.Errorf("failed to set deadline: %v", err)
}
req := &http.Request{
Method: http.MethodConnect,
URL: &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
Host: targetHost,
Header: make(http.Header),
}
// Add proxy authentication if provided
if username != "" && password != "" {
req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
}
if err := req.Write(conn); err != nil {
conn.Close()
return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
}
resp, err := http.ReadResponse(bufio.NewReader(conn), req)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to read proxy response: %v", err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
conn.Close()
return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
}
// Close the response body
resp.Body.Close()
// Clear the deadline now that we've finished the proxy handshake
if err := conn.SetDeadline(time.Time{}); err != nil {
conn.Close()
return nil, fmt.Errorf("failed to clear deadline: %v", err)
}
return conn, nil
}
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
defer s.grWG.Done()
@@ -541,6 +661,19 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
// Capture proxy configuration once before the loop with proper locking
remote.RLock()
proxyURL := remote.Proxy.URL
proxyUsername := remote.Proxy.Username
proxyPassword := remote.Proxy.Password
proxyTimeout := remote.Proxy.Timeout
remote.RUnlock()
// Set default proxy timeout if not specified
if proxyTimeout == 0 {
proxyTimeout = dialTimeout
}
attempts := 0
for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
@@ -557,7 +690,25 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
err = ErrLeafNodeDisabled
} else {
s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
conn, err = natsDialTimeout("tcp", url, dialTimeout)
// Check if proxy is configured first, then check if URL supports it
if proxyURL != _EMPTY_ && isWSURL(rURL) {
// Use proxy for WebSocket connections - use original hostname, resolved IP for connection
targetHost := rURL.Host
// If URL doesn't include port, add the default port for the scheme
if rURL.Port() == _EMPTY_ {
defaultPort := "80"
if rURL.Scheme == wsSchemePrefixTLS {
defaultPort = "443"
}
targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
}
conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
} else {
// Direct connection
conn, err = natsDialTimeout("tcp", url, dialTimeout)
}
}
}
if err != nil {
@@ -1287,6 +1438,13 @@ func (c *client) processLeafnodeInfo(info *Info) {
// otherwise if there is no TLS configuration block for the remote,
// the solicit side will not attempt to perform the TLS handshake.
if firstINFO && info.TLSRequired {
// Check for TLS/proxy configuration mismatch
if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
c.mu.Unlock()
c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
c.closeConnection(TLSHandshakeError)
return
}
remote.TLS = true
}
if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
@@ -2040,7 +2198,7 @@ func (s *Server) checkInternalSyncConsumers(acc *Account) {
// We will check all streams in our local account. They must be a leader and
// be sourcing or mirroring. We will check the external config on the stream itself
// if this is cross domain, or if the remote domain is empty, meaning we might be
// extedning the system across this leafnode connection and hence we would be extending
// extending the system across this leafnode connection and hence we would be extending
// our own domain.
jsa := js.lookupAccount(acc)
if jsa == nil {

View File

@@ -245,6 +245,10 @@ func (s *Server) RateLimitDebugf(format string, v ...any) {
// Fatalf logs a fatal error
func (s *Server) Fatalf(format string, v ...any) {
if s.isShuttingDown() {
s.Errorf(format, v)
return
}
s.executeLogCall(func(logger Logger, format string, v ...any) {
logger.Fatalf(format, v...)
}, format, v...)

View File

@@ -42,7 +42,9 @@ type memStore struct {
scb StorageUpdateHandler
rmcb StorageRemoveMsgHandler
pmsgcb ProcessJetStreamMsgHandler
ageChk *time.Timer
ageChk *time.Timer // Timer to expire messages.
ageChkRun bool // Whether message expiration is currently running.
ageChkTime int64 // When the message expiration is scheduled to run.
consumers int
receivedAny bool
ttls *thw.HashWheel
@@ -113,6 +115,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
if ms.ageChk != nil && ms.cfg.MaxAge == 0 {
ms.ageChk.Stop()
ms.ageChk = nil
ms.ageChkTime = 0
}
// Make sure to update MaxMsgsPer
if cfg.MaxMsgsPer < -1 {
@@ -309,6 +312,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, tt
if ms.scheduling != nil {
if schedule, ok := getMessageSchedule(hdr); ok && !schedule.IsZero() {
ms.scheduling.add(seq, subj, schedule.UnixNano())
} else {
ms.scheduling.removeSubject(subj)
}
}
@@ -354,12 +359,21 @@ func (ms *memStore) StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, i
}
// SkipMsg will use the next sequence number but not store anything.
func (ms *memStore) SkipMsg() uint64 {
func (ms *memStore) SkipMsg(seq uint64) (uint64, error) {
// Grab time.
now := time.Unix(0, ats.AccessTime()).UTC()
ms.mu.Lock()
seq := ms.state.LastSeq + 1
defer ms.mu.Unlock()
// Check sequence matches our last sequence.
if seq != ms.state.LastSeq+1 {
if seq > 0 {
return 0, ErrSequenceMismatch
}
seq = ms.state.LastSeq + 1
}
ms.state.LastSeq = seq
ms.state.LastTime = now
if ms.state.Msgs == 0 {
@@ -368,8 +382,7 @@ func (ms *memStore) SkipMsg() uint64 {
} else {
ms.dmap.Insert(seq)
}
ms.mu.Unlock()
return seq
return seq, nil
}
// Skip multiple msgs.
@@ -1062,6 +1075,12 @@ func (ms *memStore) startAgeChk() {
// Lock should be held.
func (ms *memStore) resetAgeChk(delta int64) {
// If we're already expiring messages, it will make sure to reset.
// Don't trigger again, as that could result in many expire goroutines.
if ms.ageChkRun {
return
}
var next int64 = math.MaxInt64
if ms.ttls != nil {
next = ms.ttls.GetNextExpiration(next)
@@ -1107,6 +1126,14 @@ func (ms *memStore) resetAgeChk(delta int64) {
fireIn = 250 * time.Millisecond
}
// If we want to kick the timer to run later than what was assigned before, don't reset it.
// Otherwise, we could get in a situation where the timer is continuously reset, and it never runs.
expires := ats.AccessTime() + fireIn.Nanoseconds()
if ms.ageChkTime > 0 && expires > ms.ageChkTime {
return
}
ms.ageChkTime = expires
if ms.ageChk != nil {
ms.ageChk.Reset(fireIn)
} else {
@@ -1119,6 +1146,7 @@ func (ms *memStore) cancelAgeChk() {
if ms.ageChk != nil {
ms.ageChk.Stop()
ms.ageChk = nil
ms.ageChkTime = 0
}
}
@@ -1126,17 +1154,22 @@ func (ms *memStore) cancelAgeChk() {
func (ms *memStore) expireMsgs() {
var smv StoreMsg
var sm *StoreMsg
ms.mu.RLock()
ms.mu.Lock()
maxAge := int64(ms.cfg.MaxAge)
minAge := time.Now().UnixNano() - maxAge
rmcb := ms.rmcb
pmsgcb := ms.pmsgcb
sdmTTL := int64(ms.cfg.SubjectDeleteMarkerTTL.Seconds())
sdmEnabled := sdmTTL > 0
ms.mu.RUnlock()
// If SDM is enabled, but handlers aren't set up yet. Try again later.
if sdmEnabled && (rmcb == nil || pmsgcb == nil) {
ms.resetAgeChk(0)
ms.mu.Unlock()
return
}
ms.ageChkRun = true
ms.mu.Unlock()
if maxAge > 0 {
var seq uint64
@@ -1150,7 +1183,7 @@ func (ms *memStore) expireMsgs() {
}
if sdmEnabled {
if last, ok := ms.shouldProcessSdm(seq, sm.subj); ok {
sdm := last && isSubjectDeleteMarker(sm.hdr)
sdm := last && !isSubjectDeleteMarker(sm.hdr)
ms.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL)
}
} else {
@@ -1168,27 +1201,13 @@ func (ms *memStore) expireMsgs() {
// TODO: Not great that we're holding the lock here, but the timed hash wheel isn't thread-safe.
nextTTL := int64(math.MaxInt64)
var rmSeqs []uint64
var ttlSdm map[string][]SDMBySubj
var rmSeqs []thw.HashWheelEntry
if ms.ttls != nil {
ms.ttls.ExpireTasks(func(seq uint64, ts int64) bool {
if sdmEnabled {
// Need to grab subject for the specified sequence, and check
// if the message hasn't been removed in the meantime.
sm, _ = ms.loadMsgLocked(seq, &smv, false)
if sm != nil {
if ttlSdm == nil {
ttlSdm = make(map[string][]SDMBySubj, 1)
}
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)})
return false
}
} else {
// Collect sequences to remove. Don't remove messages inline here,
// as that releases the lock and THW is not thread-safe.
rmSeqs = append(rmSeqs, seq)
}
return true
rmSeqs = append(rmSeqs, thw.HashWheelEntry{Seq: seq, Expires: ts})
// We might need to remove messages out of band, those can fail, and we can be shutdown halfway
// through so don't remove from THW just yet.
return false
})
if maxAge > 0 {
// Only check if we're expiring something in the next MaxAge interval, saves us a bit
@@ -1200,34 +1219,46 @@ func (ms *memStore) expireMsgs() {
}
// Remove messages collected by THW.
for _, seq := range rmSeqs {
ms.removeMsg(seq, false)
}
// THW is unordered, so must sort by sequence and must not be holding the lock.
if len(ttlSdm) > 0 {
if !sdmEnabled {
for _, rm := range rmSeqs {
ms.removeMsg(rm.Seq, false)
}
} else {
// THW is unordered, so must sort by sequence and must not be holding the lock.
ms.mu.Unlock()
for subj, es := range ttlSdm {
slices.SortFunc(es, func(a, b SDMBySubj) int {
if a.seq == b.seq {
return 0
} else if a.seq < b.seq {
return -1
} else {
return 1
}
})
for _, e := range es {
if last, ok := ms.shouldProcessSdm(e.seq, subj); ok {
sdm := last && !e.sdm
ms.handleRemovalOrSdm(e.seq, subj, sdm, sdmTTL)
}
slices.SortFunc(rmSeqs, func(a, b thw.HashWheelEntry) int {
if a.Seq == b.Seq {
return 0
} else if a.Seq < b.Seq {
return -1
} else {
return 1
}
})
for _, rm := range rmSeqs {
// Need to grab subject for the specified sequence if for SDM, and check
// if the message hasn't been removed in the meantime.
// We need to grab the message and check if we should process SDM while holding the lock,
// otherwise we can race if a deletion of this message is in progress.
ms.mu.Lock()
sm, _ = ms.loadMsgLocked(rm.Seq, &smv, false)
if sm == nil {
ms.ttls.Remove(rm.Seq, rm.Expires)
ms.mu.Unlock()
continue
}
last, ok := ms.shouldProcessSdmLocked(rm.Seq, sm.subj)
ms.mu.Unlock()
if ok {
sdm := last && !isSubjectDeleteMarker(sm.hdr)
ms.handleRemovalOrSdm(rm.Seq, sm.subj, sdm, sdmTTL)
}
}
ms.mu.Lock()
}
// Only cancel if no message left, not on potential lookup error that would result in sm == nil.
ms.ageChkRun, ms.ageChkTime = false, 0
if ms.state.Msgs == 0 && nextTTL == math.MaxInt64 {
ms.cancelAgeChk()
} else {
@@ -1242,12 +1273,20 @@ func (ms *memStore) expireMsgs() {
func (ms *memStore) shouldProcessSdm(seq uint64, subj string) (bool, bool) {
ms.mu.Lock()
defer ms.mu.Unlock()
return ms.shouldProcessSdmLocked(seq, subj)
}
// Lock should be held.
func (ms *memStore) shouldProcessSdmLocked(seq uint64, subj string) (bool, bool) {
if ms.sdm == nil {
ms.sdm = newSDMMeta()
}
if p, ok := ms.sdm.pending[seq]; ok {
// Don't allow more proposals for the same sequence if we already did recently.
if time.Since(time.Unix(0, p.ts)) < 2*time.Second {
return p.last, false
}
// If we're about to use the cached value, and we knew it was last before,
// quickly check that we don't have more remaining messages for the subject now.
// Which means we are not the last anymore and must reset to not remove later data.
@@ -1258,11 +1297,6 @@ func (ms *memStore) shouldProcessSdm(seq uint64, subj string) (bool, bool) {
p.last = false
}
}
// Don't allow more proposals for the same sequence if we already did recently.
if time.Since(time.Unix(0, p.ts)) < 2*time.Second {
return p.last, false
}
ms.sdm.pending[seq] = SDMBySeq{p.last, time.Now().UnixNano()}
return p.last, true
}
@@ -1302,9 +1336,15 @@ func (ms *memStore) runMsgScheduling() {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.scheduling == nil || ms.pmsgcb == nil {
// If scheduling is enabled, but handler isn't set up yet. Try again later.
if ms.scheduling == nil {
return
}
if ms.pmsgcb == nil {
ms.scheduling.resetTimer()
return
}
ms.scheduling.running = true
scheduledMsgs := ms.scheduling.getScheduledMessages(func(seq uint64, smv *StoreMsg) *StoreMsg {
sm, _ := ms.loadMsgLocked(seq, smv, false)
@@ -1318,9 +1358,8 @@ func (ms *memStore) runMsgScheduling() {
ms.mu.Lock()
}
if ms.scheduling != nil {
ms.scheduling.resetTimer()
}
ms.scheduling.running, ms.scheduling.deadline = false, 0
ms.scheduling.resetTimer()
}
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
@@ -1933,6 +1972,15 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
ms.dmap.Insert(seq)
ms.updateFirstSeq(seq)
// Remove any per subject tracking.
ms.removeSeqPerSubject(sm.subj, seq)
if ms.ttls != nil {
if ttl, err := getMessageTTL(sm.hdr); err == nil {
expires := time.Duration(sm.ts) + (time.Second * time.Duration(ttl))
ms.ttls.Remove(seq, int64(expires))
}
}
if secure {
if len(sm.hdr) > 0 {
sm.hdr = make([]byte, len(sm.hdr))
@@ -1945,9 +1993,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
sm.seq, sm.ts = 0, 0
}
// Remove any per subject tracking.
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)
@@ -2056,6 +2101,7 @@ func (ms *memStore) Stop() error {
if ms.ageChk != nil {
ms.ageChk.Stop()
ms.ageChk = nil
ms.ageChkTime = 0
}
ms.msgs = nil
ms.mu.Unlock()

View File

@@ -2966,18 +2966,20 @@ type MetaClusterInfo struct {
// JSInfo has detailed information on JetStream.
type JSInfo struct {
JetStreamStats
ID string `json:"server_id"`
Now time.Time `json:"now"`
Disabled bool `json:"disabled,omitempty"`
Config JetStreamConfig `json:"config,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Messages uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
Total int `json:"total"`
ID string `json:"server_id"`
Now time.Time `json:"now"`
Disabled bool `json:"disabled,omitempty"`
Config JetStreamConfig `json:"config,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
Streams int `json:"streams"`
StreamsLeader int `json:"streams_leader,omitempty"`
Consumers int `json:"consumers"`
ConsumersLeader int `json:"consumers_leader,omitempty"`
Messages uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
Total int `json:"total"`
}
func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail {
@@ -3197,6 +3199,16 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
jsi.Messages += streamState.Msgs
jsi.Bytes += streamState.Bytes
jsi.Consumers += streamState.Consumers
if opts.RaftGroups {
if node := stream.raftNode(); node == nil || node.Leader() {
jsi.StreamsLeader++
}
for _, consumer := range stream.getPublicConsumers() {
if node := consumer.raftNode(); node == nil || node.Leader() {
jsi.ConsumersLeader++
}
}
}
}
}

View File

@@ -15,6 +15,7 @@ package server
import (
"context"
"crypto/fips140"
"crypto/tls"
"crypto/x509"
"errors"
@@ -83,6 +84,7 @@ type ClusterOpts struct {
Compression CompressionOpts `json:"-"`
PingInterval time.Duration `json:"-"`
MaxPingsOut int `json:"-"`
WriteDeadline time.Duration `json:"-"`
// Not exported (used in tests)
resolver netResolver
@@ -125,6 +127,7 @@ type GatewayOpts struct {
ConnectBackoff bool `json:"connect_backoff,omitempty"`
Gateways []*RemoteGatewayOpts `json:"gateways,omitempty"`
RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster
WriteDeadline time.Duration `json:"-"`
// Not exported, for tests.
resolver netResolver
@@ -175,6 +178,7 @@ type LeafNodeOpts struct {
Advertise string `json:"-"`
NoAdvertise bool `json:"-"`
ReconnectInterval time.Duration `json:"-"`
WriteDeadline time.Duration `json:"-"`
// Compression options
Compression CompressionOpts `json:"-"`
@@ -241,6 +245,18 @@ type RemoteLeafOpts struct {
NoMasking bool `json:"-"`
}
// HTTP Proxy configuration for WebSocket connections
Proxy struct {
// URL of the HTTP proxy server (e.g., "http://proxy.example.com:8080")
URL string `json:"-"`
// Username for proxy authentication
Username string `json:"-"`
// Password for proxy authentication
Password string `json:"-"`
// Timeout for proxy connection
Timeout time.Duration `json:"-"`
}
tlsConfigOpts *TLSConfigOpts
// If we are clustered and our local account has JetStream, if apps are accessing
@@ -1986,6 +2002,8 @@ func parseCluster(v any, opts *Options, errors *[]error, warnings *[]error) erro
}
case "ping_max":
opts.Cluster.MaxPingsOut = int(mv.(int64))
case "write_deadline":
opts.Cluster.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
@@ -2174,6 +2192,8 @@ func parseGateway(v any, o *Options, errors *[]error, warnings *[]error) error {
o.Gateway.Gateways = gateways
case "reject_unknown", "reject_unknown_cluster":
o.Gateway.RejectUnknown = mv.(bool)
case "write_deadline":
o.Gateway.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
@@ -2465,6 +2485,9 @@ func parseJetStreamTPM(v interface{}, opts *Options, errors *[]error) error {
func setJetStreamEkCipher(opts *Options, mv interface{}, tk token) error {
switch strings.ToLower(mv.(string)) {
case "chacha", "chachapoly":
if fips140.Enabled() {
return &configErr{tk, fmt.Sprintf("Cipher type %q cannot be used in FIPS-140 mode", mv)}
}
opts.JetStreamCipher = ChaCha
case "aes":
opts.JetStreamCipher = AES
@@ -2694,6 +2717,8 @@ func parseLeafNodes(v any, opts *Options, errors *[]error, warnings *[]error) er
}
case "isolate_leafnode_interest", "isolate":
opts.LeafNode.IsolateLeafnodeInterest = mv.(bool)
case "write_deadline":
opts.LeafNode.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
@@ -2990,6 +3015,48 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL
remote.FirstInfoTimeout = parseDuration(k, tk, v, errors, warnings)
case "disabled":
remote.Disabled = v.(bool)
case "proxy":
proxyMap, ok := v.(map[string]any)
if !ok {
*errors = append(*errors, &configErr{tk, fmt.Sprintf("Expected proxy to be a map, got %T", v)})
continue
}
// Capture the token for the "proxy" field itself, before the map iteration
proxyToken := tk
for pk, pv := range proxyMap {
tk, pv = unwrapValue(pv, &lt)
switch strings.ToLower(pk) {
case "url":
remote.Proxy.URL = pv.(string)
case "username":
remote.Proxy.Username = pv.(string)
case "password":
remote.Proxy.Password = pv.(string)
case "timeout":
remote.Proxy.Timeout = parseDuration("proxy timeout", tk, pv, errors, warnings)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
field: pk,
configErr: configErr{
token: tk,
},
}
*errors = append(*errors, err)
continue
}
}
}
// Use the saved proxy token for validation errors, not the last field token
if warns, err := validateLeafNodeProxyOptions(remote); err != nil {
*errors = append(*errors, &configErr{proxyToken, err.Error()})
continue
} else {
// Add any warnings about proxy configuration
for _, warn := range warns {
*warnings = append(*warnings, &configErr{proxyToken, warn})
}
}
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
@@ -4315,6 +4382,10 @@ func parseAuthorization(v any, errors, warnings *[]error) (*authorization, error
}
auth.defaultPermissions = permissions
case "auth_callout", "auth_hook":
if fips140.Enabled() {
*errors = append(*errors, fmt.Errorf("'auth_callout' cannot be configured in FIPS-140 mode"))
continue
}
ac, err := parseAuthCallout(tk, errors)
if err != nil {
*errors = append(*errors, err)

View File

@@ -76,6 +76,7 @@ type RaftNode interface {
ApplyQ() *ipQueue[*CommittedEntry]
PauseApply() error
ResumeApply()
DrainAndReplaySnapshot() bool
LeadChangeC() <-chan bool
QuitC() <-chan struct{}
Created() time.Time
@@ -1040,10 +1041,13 @@ func (n *raft) PauseApply() error {
if n.State() == Leader {
return errAlreadyLeader
}
n.Lock()
defer n.Unlock()
n.pauseApplyLocked()
return nil
}
func (n *raft) pauseApplyLocked() {
// If we are currently a candidate make sure we step down.
if n.State() == Candidate {
n.stepdownLocked(noLeader)
@@ -1051,11 +1055,11 @@ func (n *raft) PauseApply() error {
n.debug("Pausing our apply channel")
n.paused = true
n.hcommit = n.commit
if n.hcommit < n.commit {
n.hcommit = n.commit
}
// Also prevent us from trying to become a leader while paused and catching up.
n.resetElect(observerModeInterval)
return nil
}
// ResumeApply will resume sending applies to the external apply queue. This
@@ -1107,6 +1111,25 @@ func (n *raft) ResumeApply() {
}
}
// DrainAndReplaySnapshot will drain the apply queue and replay the snapshot.
// Our highest known commit will be preserved by pausing applies. The caller
// should make sure to call ResumeApply() when handling the snapshot from the
// queue, which will populate the rest of the committed entries in the queue.
func (n *raft) DrainAndReplaySnapshot() bool {
n.Lock()
defer n.Unlock()
n.warn("Draining and replaying snapshot")
snap, err := n.loadLastSnapshot()
if err != nil {
return false
}
n.pauseApplyLocked()
n.apply.drain()
n.commit = snap.lastIndex
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
return true
}
// Applied is a callback that must be called by the upper layer when it
// has successfully applied the committed entries that it received from the
// apply queue. It will return the number of entries and an estimation of the
@@ -2706,7 +2729,6 @@ func (n *raft) runAsLeader() {
n.stepdown(noLeader)
return
}
n.trackPeer(vresp.peer)
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
if voteReq, ok := n.reqs.popOne(); ok {
@@ -3070,7 +3092,7 @@ func (n *raft) applyCommit(index uint64) error {
if lp, ok := n.peers[newPeer]; !ok {
// We are not tracking this one automatically so we need to bump cluster size.
n.peers[newPeer] = &lps{time.Now(), 0, true}
n.peers[newPeer] = &lps{time.Time{}, 0, true}
} else {
// Mark as added.
lp.kp = true
@@ -3470,6 +3492,17 @@ func (n *raft) updateLeader(newLeader string) {
}
}
}
// Reset last seen timestamps.
// If we're the leader we track everyone, and don't reset.
// But if we're a follower we only track the leader, and reset all others.
if newLeader != n.id {
for peer, ps := range n.peers {
if peer == newLeader {
continue
}
ps.ts = time.Time{}
}
}
}
// processAppendEntry will process an appendEntry. This is called either
@@ -3560,19 +3593,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// sub, rather than a catch-up sub.
isNew := sub != nil && sub == n.aesub
// Track leader directly
if isNew && ae.leader != noLeader {
if ps := n.peers[ae.leader]; ps != nil {
ps.ts = time.Now()
} else {
n.peers[ae.leader] = &lps{time.Now(), 0, true}
}
}
// If we are/were catching up ignore old catchup subs, but only if catching up from an older server
// that doesn't send the leader term when catching up. We can reject old catchups from newer subs
// later, just by checking the append entry is on the correct term.
if !isNew && sub != nil && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) {
// that doesn't send the leader term when catching up or if we would truncate as a result.
// We can reject old catchups from newer subs later, just by checking the append entry is on the correct term.
if !isNew && sub != nil && (ae.lterm == 0 || ae.pindex < n.pindex) && (!catchingUp || sub != n.catchup.sub) {
n.Unlock()
n.debug("AppendEntry ignoring old entry from previous catchup")
return
@@ -3640,6 +3664,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.updateLeadChange(false)
}
// Track leader directly
// But, do so after all consistency checks so we don't track an old leader.
if isNew && ae.leader != noLeader && ae.leader == n.leader {
if ps := n.peers[ae.leader]; ps != nil {
ps.ts = time.Now()
} else {
n.peers[ae.leader] = &lps{time.Now(), 0, true}
}
}
if ae.pterm != n.pterm || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
@@ -3808,10 +3842,8 @@ CONTINUE:
case EntryAddPeer:
if newPeer := string(e.Data); len(newPeer) == idLen {
// Track directly, but wait for commit to be official
if ps := n.peers[newPeer]; ps != nil {
ps.ts = time.Now()
} else {
n.peers[newPeer] = &lps{time.Now(), 0, false}
if _, ok := n.peers[newPeer]; !ok {
n.peers[newPeer] = &lps{time.Time{}, 0, false}
}
// Store our peer in our global peer map for all peers.
peers.LoadOrStore(newPeer, newPeer)
@@ -4362,10 +4394,6 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
}
n.debug("Received a voteRequest %+v", vr)
if err := n.trackPeer(vr.candidate); err != nil {
return err
}
n.Lock()
vresp := &voteResponse{n.term, n.id, false, n.pindex == 0}

View File

@@ -1077,7 +1077,7 @@ func (p *proxiesReload) Apply(s *Server) {
c.setAuthError(ErrAuthProxyNotTrusted)
c.authViolation()
}
s.Noticef("Reloaded: proxies trusted keys %q were removed", p.add)
s.Noticef("Reloaded: proxies trusted keys %q were removed", p.del)
}
if len(p.add) > 0 {
s.Noticef("Reloaded: proxies trusted keys %q were added", p.add)

View File

@@ -35,6 +35,8 @@ type MsgScheduling struct {
run func()
ttls *thw.HashWheel
timer *time.Timer
running bool
deadline int64
schedules map[string]*MsgSchedule
seqToSubj map[uint64]string
inflight map[string]struct{}
@@ -93,11 +95,25 @@ func (ms *MsgScheduling) remove(seq uint64) {
}
}
func (ms *MsgScheduling) removeSubject(subj string) {
if sched, ok := ms.schedules[subj]; ok {
ms.ttls.Remove(sched.seq, sched.ts)
delete(ms.schedules, subj)
delete(ms.seqToSubj, sched.seq)
}
}
func (ms *MsgScheduling) clearInflight() {
ms.inflight = make(map[string]struct{})
}
func (ms *MsgScheduling) resetTimer() {
// If we're already scheduling messages, it will make sure to reset.
// Don't trigger again, as that could result in many expire goroutines.
if ms.running {
return
}
next := ms.ttls.GetNextExpiration(math.MaxInt64)
if next == math.MaxInt64 {
clearTimer(&ms.timer)
@@ -111,6 +127,14 @@ func (ms *MsgScheduling) resetTimer() {
fireIn = 250 * time.Millisecond
}
// If we want to kick the timer to run later than what was assigned before, don't reset it.
// Otherwise, we could get in a situation where the timer is continuously reset, and it never runs.
deadline := time.Now().UnixNano() + fireIn.Nanoseconds()
if ms.deadline > 0 && deadline > ms.deadline {
return
}
ms.deadline = deadline
if ms.timer != nil {
ms.timer.Reset(fireIn)
} else {

View File

@@ -30,12 +30,6 @@ type SDMBySeq struct {
ts int64 // Last timestamp we proposed a removal/sdm.
}
// SDMBySubj holds whether a message for a specific subject and sequence was a subject delete marker or not.
type SDMBySubj struct {
seq uint64
sdm bool
}
func newSDMMeta() *SDMMeta {
return &SDMMeta{
totals: make(map[string]uint64, 1),
@@ -46,7 +40,7 @@ func newSDMMeta() *SDMMeta {
// isSubjectDeleteMarker returns whether the headers indicate this message is a subject delete marker.
// Either it's a usual marker with JSMarkerReason, or it's a KV Purge marker as the KVOperation.
func isSubjectDeleteMarker(hdr []byte) bool {
return len(sliceHeader(JSMarkerReason, hdr)) == 0 && !bytes.Equal(sliceHeader(KVOperation, hdr), KVOperationValuePurge)
return len(sliceHeader(JSMarkerReason, hdr)) != 0 || bytes.Equal(sliceHeader(KVOperation, hdr), KVOperationValuePurge)
}
// empty clears all data.

View File

@@ -16,6 +16,7 @@ package server
import (
"bytes"
"context"
"crypto/fips140"
"crypto/tls"
"encoding/json"
"errors"
@@ -697,6 +698,15 @@ func New(opts *Options) *Server {
return s
}
func NewServerFromConfig(opts *Options) (*Server, error) {
if opts.ConfigFile != _EMPTY_ && opts.configDigest == "" {
if err := opts.ProcessConfigFile(opts.ConfigFile); err != nil {
return nil, err
}
}
return NewServer(opts)
}
// NewServer will setup a new server struct after parsing the options.
// Could return an error if options can not be validated.
// The provided Options type should not be re-used afterwards.
@@ -713,8 +723,12 @@ func NewServer(opts *Options) (*Server, error) {
pub, _ := kp.PublicKey()
// Create an xkey for encrypting messages from this server.
xkp, _ := nkeys.CreateCurveKeys()
xpub, _ := xkp.PublicKey()
var xkp nkeys.KeyPair
var xpub string
if !fips140.Enabled() {
xkp, _ = nkeys.CreateCurveKeys()
xpub, _ = xkp.PublicKey()
}
serverName := pub
if opts.ServerName != _EMPTY_ {

View File

@@ -91,7 +91,7 @@ type ProcessJetStreamMsgHandler func(*inMsg)
type StreamStore interface {
StoreMsg(subject string, hdr, msg []byte, ttl int64) (uint64, int64, error)
StoreRawMsg(subject string, hdr, msg []byte, seq uint64, ts int64, ttl int64) error
SkipMsg() uint64
SkipMsg(seq uint64) (uint64, error)
SkipMsgs(seq uint64, num uint64) error
FlushAllPending()
LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error)

View File

@@ -3143,6 +3143,7 @@ func (mset *stream) setupMirrorConsumer() error {
}
mirror := mset.mirror
mirrorWg := &mirror.wg
// We want to throttle here in terms of how fast we request new consumers,
// or if the previous is still in progress.
@@ -3301,7 +3302,7 @@ func (mset *stream) setupMirrorConsumer() error {
// Wait for previous processMirrorMsgs go routine to be completely done.
// If none is running, this will not block.
mirror.wg.Wait()
mirrorWg.Wait()
select {
case ccr := <-respCh:
@@ -5934,7 +5935,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Skip msg here.
if noInterest {
mset.lseq = store.SkipMsg()
mset.lseq, _ = store.SkipMsg(0)
mset.lmsgId = msgId
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
@@ -6264,7 +6265,6 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
}
return err
}
commit := len(sliceHeader(JSBatchCommit, hdr)) != 0
mset.mu.Lock()
if mset.batches == nil {
@@ -6339,6 +6339,37 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
batches.group[batchId] = b
}
var commit bool
if c := sliceHeader(JSBatchCommit, hdr); c != nil {
// Reject the batch if the commit is not recognized.
if !bytes.Equal(c, []byte("1")) {
b.cleanupLocked(batchId, batches)
batches.mu.Unlock()
err := NewJSAtomicPublishInvalidBatchCommitError()
if canRespond {
b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err})
outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0))
}
return err
}
commit = true
}
// The required API level can have the batch be rejected. But the header is always removed.
if len(sliceHeader(JSRequiredApiLevel, hdr)) != 0 {
if errorOnRequiredApiLevel(hdr) {
b.cleanupLocked(batchId, batches)
batches.mu.Unlock()
err := NewJSRequiredApiLevelError()
if canRespond {
b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err})
outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0))
}
return err
}
hdr = removeHeaderIfPresent(hdr, JSRequiredApiLevel)
}
// Detect gaps.
b.lseq++
if b.lseq != batchSeq {
@@ -6431,7 +6462,8 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
rollback := func(seq uint64) {
if isClustered {
// TODO(mvv): reset in-memory expected header maps
// Only need to move the clustered sequence back if the batch fails to commit.
// Other changes were staged but not applied, so this is the only thing we need to do.
mset.clseq -= seq - 1
}
mset.clMu.Unlock()
@@ -6475,14 +6507,11 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
}
// Reject unsupported headers.
if msgId := getMsgId(bhdr); msgId != _EMPTY_ {
return errorOnUnsupported(seq, JSMsgId)
}
if getExpectedLastMsgId(hdr) != _EMPTY_ {
return errorOnUnsupported(seq, JSExpectedLastMsgId)
}
if bhdr, bmsg, _, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, bhdr, bmsg, false, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
if bhdr, bmsg, _, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, bsubj, bhdr, bmsg, false, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
rollback(seq)
b.cleanupLocked(batchId, batches)
batches.mu.Unlock()
@@ -6537,12 +6566,9 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
// Do a single multi proposal. This ensures we get to push all entries to the proposal queue in-order
// and not interleaved with other proposals.
diff.commit(mset)
if err := node.ProposeMulti(entries); err == nil {
mset.trackReplicationTraffic(node, sz, r)
} else {
// TODO(mvv): reset in-memory expected header maps
mset.clseq -= batchSeq
}
_ = node.ProposeMulti(entries)
// The proposal can fail, but we always account for trying.
mset.trackReplicationTraffic(node, sz, r)
// Check to see if we are being overrun.
// TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.

View File

@@ -995,6 +995,9 @@ func (n *node) isEmpty() bool {
// Return the number of nodes for the given level.
func (l *level) numNodes() int {
if l == nil {
return 0
}
num := len(l.nodes)
if l.pwc != nil {
num++
@@ -1758,39 +1761,49 @@ func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb f
if len(nsubj) > 0 {
nsubj = append(subj, '.')
}
switch {
case r.fwc != nil:
if r.fwc != nil {
// We've reached a full wildcard, do a FWC match on the stree at this point
// and don't keep iterating downward.
nsubj := append(nsubj, '>')
st.Match(nsubj, cb)
case r.pwc != nil:
return
}
if r.pwc != nil {
// We've found a partial wildcard. We'll keep iterating downwards, but first
// check whether there's interest at this level (without triggering dupes) and
// match if so.
var done bool
nsubj := append(nsubj, '*')
if len(r.pwc.psubs)+len(r.pwc.qsubs) > 0 {
st.Match(nsubj, cb)
done = true
}
if r.pwc.next != nil && r.pwc.next.numNodes() > 0 {
if r.pwc.next.numNodes() > 0 {
intersectStree(st, r.pwc.next, nsubj, cb)
}
default:
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
nsubj := append(nsubj, t...)
if len(n.psubs)+len(n.qsubs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
if done {
return
}
}
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
// A wildcard at the next level will already visit these descendents
// so skip so we don't callback the same subject more than once.
continue
}
nsubj := append(nsubj, t...)
if len(n.psubs)+len(n.qsubs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
}
if n.next != nil && n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
if n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
}

View File

@@ -48,6 +48,12 @@ type HashWheel struct {
count uint64 // How many entries are present?
}
// HashWheelEntry represents a single entry in the wheel.
type HashWheelEntry struct {
Seq uint64
Expires int64
}
// NewHashWheel initializes a new HashWheel.
func NewHashWheel() *HashWheel {
return &HashWheel{
@@ -61,17 +67,6 @@ func (hw *HashWheel) getPosition(expires int64) int64 {
return (expires / tickDuration) & wheelMask
}
// updateLowestExpires finds the new lowest expiration time across all slots.
func (hw *HashWheel) updateLowestExpires() {
lowest := int64(math.MaxInt64)
for _, s := range hw.wheel {
if s != nil && s.lowest < lowest {
lowest = s.lowest
}
}
hw.lowest = lowest
}
// newSlot creates a new slot.
func newSlot() *slot {
return &slot{
@@ -120,22 +115,7 @@ func (hw *HashWheel) Remove(seq uint64, expires int64) error {
// If the slot is empty, we can set it to nil to free memory.
if len(s.entries) == 0 {
hw.wheel[pos] = nil
} else if expires == s.lowest {
// Find new lowest in this slot.
lowest := int64(math.MaxInt64)
for _, exp := range s.entries {
if exp < lowest {
lowest = exp
}
}
s.lowest = lowest
}
// If we removed the global lowest, find the new one.
if expires == hw.lowest {
hw.updateLowestExpires()
}
return nil
}

View File

@@ -195,7 +195,7 @@ func (r *Reservation) CancelAt(t time.Time) {
// update state
r.lim.last = t
r.lim.tokens = tokens
if r.timeToAct == r.lim.lastEvent {
if r.timeToAct.Equal(r.lim.lastEvent) {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(t) {
r.lim.lastEvent = prevEvent

6
vendor/modules.txt vendored
View File

@@ -736,7 +736,7 @@ github.com/google/go-querystring/query
# github.com/google/go-tika v0.3.1
## explicit; go 1.11
github.com/google/go-tika/tika
# github.com/google/go-tpm v0.9.5
# github.com/google/go-tpm v0.9.6
## explicit; go 1.22
github.com/google/go-tpm/legacy/tpm2
github.com/google/go-tpm/tpmutil
@@ -1124,7 +1124,7 @@ github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.8.0
## explicit; go 1.23.0
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.12.0
# github.com/nats-io/nats-server/v2 v2.12.1
## explicit; go 1.24.0
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand
@@ -2481,7 +2481,7 @@ golang.org/x/text/transform
golang.org/x/text/unicode/bidi
golang.org/x/text/unicode/norm
golang.org/x/text/width
# golang.org/x/time v0.13.0
# golang.org/x/time v0.14.0
## explicit; go 1.24.0
golang.org/x/time/rate
# golang.org/x/tools v0.37.0