From f693709667a5d05b354836ebb0e84349b61bd177 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 10 Oct 2024 06:35:42 +0000 Subject: [PATCH] chore(deps): bump github.com/nats-io/nats-server/v2 Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.10.20 to 2.10.21. - [Release notes](https://github.com/nats-io/nats-server/releases) - [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml) - [Commits](https://github.com/nats-io/nats-server/compare/v2.10.20...v2.10.21) --- updated-dependencies: - dependency-name: github.com/nats-io/nats-server/v2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- .../nats-io/nats-server/v2/server/auth.go | 5 +- .../nats-io/nats-server/v2/server/const.go | 2 +- .../nats-io/nats-server/v2/server/events.go | 4 + .../nats-server/v2/server/filestore.go | 284 +++++++++--------- .../nats-server/v2/server/jetstream.go | 4 + .../nats-server/v2/server/jetstream_api.go | 31 +- .../v2/server/jetstream_cluster.go | 89 +++++- .../nats-server/v2/server/jetstream_events.go | 11 + .../nats-io/nats-server/v2/server/monitor.go | 7 + .../nats-io/nats-server/v2/server/opts.go | 218 +++++++++----- .../nats-io/nats-server/v2/server/reload.go | 2 +- .../nats-io/nats-server/v2/server/server.go | 28 +- .../nats-io/nats-server/v2/server/stream.go | 4 +- vendor/modules.txt | 2 +- 16 files changed, 451 insertions(+), 246 deletions(-) diff --git a/go.mod b/go.mod index 59ba700584..f5709b7810 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.10.20 + github.com/nats-io/nats-server/v2 v2.10.21 github.com/nats-io/nats.go v1.37.0 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 diff --git a/go.sum b/go.sum index 04856023a8..47033ac173 100644 --- a/go.sum +++ b/go.sum @@ -893,8 +893,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.20 h1:CXDTYNHeBiAKBTAIP2gjpgbWap2GhATnTLgP8etyvEI= -github.com/nats-io/nats-server/v2 v2.10.20/go.mod h1:hgcPnoUtMfxz1qVOvLZGurVypQ+Cg6GXVXjG53iHk+M= +github.com/nats-io/nats-server/v2 v2.10.21 h1:gfG6T06wBdI25XyY2IsauarOc2srWoFxxfsOKjrzoRA= +github.com/nats-io/nats-server/v2 v2.10.21/go.mod h1:I1YxSAEWbXCfy0bthwvNb5X43WwIWMz7gx5ZVPDr5Rc= github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= diff --git a/vendor/github.com/nats-io/nats-server/v2/server/auth.go b/vendor/github.com/nats-io/nats-server/v2/server/auth.go index 0a9564f4d0..5a1a4acd54 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/auth.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/auth.go @@ -872,7 +872,10 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au c.Debugf("Account JWT lookup error: %v", err) return false } - if !s.isTrustedIssuer(acc.Issuer) { + acc.mu.RLock() + aissuer := acc.Issuer + acc.mu.RUnlock() + if !s.isTrustedIssuer(aissuer) { c.Debugf("Account JWT not signed by trusted operator") return false } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index ea43ac4f02..be607d9451 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -55,7 +55,7 @@ func init() { const ( // VERSION is the current version for the server. - VERSION = "2.10.20" + VERSION = "2.10.21" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/events.go b/vendor/github.com/nats-io/nats-server/v2/server/events.go index 39de871f98..3f8ef05014 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/events.go @@ -97,6 +97,7 @@ const ( // FIXME(dlc) - make configurable. var eventsHBInterval = 30 * time.Second +var statsHBInterval = 10 * time.Second // Default minimum wait time for sending statsz const defaultStatszRateLimit = 1 * time.Second @@ -944,6 +945,9 @@ func (s *Server) sendStatsz(subj string) { Size: mg.ClusterSize(), } } + if ipq := s.jsAPIRoutedReqs; ipq != nil && jStat.Meta != nil { + jStat.Meta.Pending = ipq.len() + } } m.Stats.JetStream = jStat s.mu.RLock() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index 16d3ef05d4..c8f8b0271d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -436,6 +436,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Attempt to recover our state. err = fs.recoverFullState() if err != nil { + if !os.IsNotExist(err) { + fs.warn("Recovering stream state from index errored: %v", err) + } // Hold onto state prior := fs.state // Reset anything that could have been set from above. @@ -469,7 +472,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim go fs.cleanupOldMeta() }() - // Lock while do enforcements and removals. + // Lock while we do enforcements and removals. fs.mu.Lock() // Check if we have any left over tombstones to process. @@ -975,7 +978,6 @@ func (mb *msgBlock) ensureLastChecksumLoaded() { // Lock held on entry func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { mb := fs.initMsgBlock(index) - // Open up the message file, but we will try to recover from the index file. // We will check that the last checksums match. file, err := mb.openBlock() @@ -1357,6 +1359,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { minTombstoneTs int64 ) + // To detect gaps from compaction. + var last uint64 + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { if index+msgHdrSize > lbuf { truncate(index) @@ -1444,8 +1449,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { mb.bytes += uint64(rl) } + // Check for any gaps from compaction, meaning no ebit entry. + if last > 0 && seq != last+1 { + for dseq := last + 1; dseq < seq; dseq++ { + addToDmap(dseq) + } + } + // Always set last - atomic.StoreUint64(&mb.last.seq, seq) + last = seq + atomic.StoreUint64(&mb.last.seq, last) mb.last.ts = ts // Advance to next record. @@ -1665,7 +1678,8 @@ func (fs *fileStore) recoverFullState() (rerr error) { for i := 0; i < int(numBlocks); i++ { index, nbytes, fseq, fts, lseq, lts, numDeleted := uint32(readU64()), readU64(), readU64(), readI64(), readU64(), readI64(), readU64() if bi < 0 { - break + os.Remove(fn) + return errCorruptState } mb := fs.initMsgBlock(index) atomic.StoreUint64(&mb.first.seq, fseq) @@ -1734,120 +1748,33 @@ func (fs *fileStore) recoverFullState() (rerr error) { return errPriorState } if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched { - // Remove the last message block since recover will add in the new one. - fs.removeMsgBlockFromList(mb) - // Reverse update of tracking state for this mb, will add new state in below. - mstate.Msgs -= mb.msgs - mstate.Bytes -= mb.bytes - if nmb, err := fs.recoverMsgBlock(mb.index); err != nil && !os.IsNotExist(err) { - fs.warn("Stream state could not recover last msg block") - os.Remove(fn) - return errCorruptState - } else if nmb != nil { - fs.adjustAccounting(mb, nmb) - updateTrackingState(&mstate, nmb) - } + // Detected a stale index.db, we didn't write it upon shutdown so can't rely on it being correct. + fs.warn("Stream state outdated, last block has additional entries, will rebuild") + return errPriorState } - // On success double check our state. - checkState := func() error { - // We check first and last seq and number of msgs and bytes. If there is a difference, - // return and error so we rebuild from the message block state on disk. - if !trackingStatesEqual(&fs.state, &mstate) { - fs.warn("Stream state encountered internal inconsistency on recover") - os.Remove(fn) - return errCorruptState - } - return nil + // We need to see if any blocks exist after our last one even though we matched the last record exactly. + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) + var dirs []os.DirEntry + + <-dios + if f, err := os.Open(mdir); err == nil { + dirs, _ = f.ReadDir(-1) + f.Close() } + dios <- struct{}{} - // We may need to check other blocks. Even if we matched last checksum we will see if there is another block. - for bi := blkIndex + 1; ; bi++ { - nmb, err := fs.recoverMsgBlock(bi) - if err != nil { - if os.IsNotExist(err) { - return checkState() - } - os.Remove(fn) - fs.warn("Stream state could not recover msg block %d", bi) - return err - } - if nmb != nil { - // Update top level accounting - if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq { - fs.state.FirstSeq = fseq - fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC() - } - if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq { - fs.state.LastSeq = lseq - fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC() - } - fs.state.Msgs += nmb.msgs - fs.state.Bytes += nmb.bytes - updateTrackingState(&mstate, nmb) - } - } -} - -// adjustAccounting will be called when a stream state was only partially accounted for -// within a message block, e.g. additional records were added after the stream state. -// Lock should be held. -func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) { - nmb.mu.Lock() - defer nmb.mu.Unlock() - - // First make sure the new block is loaded. - if nmb.cacheNotLoaded() { - nmb.loadMsgsWithLock() - } - nmb.ensurePerSubjectInfoLoaded() - - var smv StoreMsg - - // Need to walk previous messages and undo psim stats. - // We already undid msgs and bytes accounting. - for seq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { - // Lookup the message. If an error will be deleted, so can skip. - sm, err := nmb.cacheLookup(seq, &smv) - if err != nil { - continue - } - if len(sm.subj) > 0 && fs.psim != nil { - if info, ok := fs.psim.Find(stringToBytes(sm.subj)); ok { - info.total-- + var index uint32 + for _, fi := range dirs { + if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 { + if index > blkIndex { + fs.warn("Stream state outdated, found extra blocks, will rebuild") + return errPriorState } } } - // Walk only new messages and update accounting at fs level. Any messages that should have - // triggered limits exceeded will be handled after the recovery and prior to the stream - // being available to the system. - for seq, lseq := atomic.LoadUint64(&mb.last.seq)+1, atomic.LoadUint64(&nmb.last.seq); seq <= lseq; seq++ { - // Lookup the message. If an error will be deleted, so can skip. - sm, err := nmb.cacheLookup(seq, &smv) - if err != nil { - continue - } - // Since we found it we just need to adjust fs totals and psim. - fs.state.Msgs++ - fs.state.Bytes += fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) - } - - // Now check to see if we had a higher first for the recovered state mb vs nmb. - if atomic.LoadUint64(&nmb.first.seq) < atomic.LoadUint64(&mb.first.seq) { - // Now set first for nmb. - atomic.StoreUint64(&nmb.first.seq, atomic.LoadUint64(&mb.first.seq)) - } - - // Update top level accounting. - if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq { - fs.state.FirstSeq = fseq - fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC() - } - if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq { - fs.state.LastSeq = lseq - fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC() - } + return nil } // Grabs last checksum for the named block file. @@ -5302,7 +5229,8 @@ func (mb *msgBlock) ensureRawBytesLoaded() error { // Sync msg and index files as needed. This is called from a timer. func (fs *fileStore) syncBlocks() { fs.mu.RLock() - if fs.closed { + // If closed or a snapshot is in progress bail. + if fs.closed || fs.sips > 0 { fs.mu.RUnlock() return } @@ -6786,6 +6714,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } var smv StoreMsg + var tombs []msgId fs.mu.Lock() // We may remove blocks as we purge, so don't range directly on fs.blks @@ -6839,9 +6768,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint purged++ bytes += rl } - // FSS updates. + // PSIM and FSS updates. mb.removeSeqPerSubject(sm.subj, seq) fs.removePerSubject(sm.subj) + // Track tombstones we need to write. + tombs = append(tombs, msgId{sm.seq, sm.ts}) // Check for first message. if seq == atomic.LoadUint64(&mb.first.seq) { @@ -6880,7 +6811,16 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint if firstSeqNeedsUpdate { fs.selectNextFirst() } + fseq := fs.state.FirstSeq + // Write any tombstones as needed. + for _, tomb := range tombs { + if tomb.seq > fseq { + fs.lmb.writeTombstone(tomb.seq, tomb.ts) + } + } + + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() @@ -6923,7 +6863,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { fs.bim = make(map[uint32]*msgBlock) // Clear any per subject tracking. fs.psim, fs.tsl = fs.psim.Empty(), 0 - // Mark dirty + // Mark dirty. fs.dirty++ // Move the msgs directory out of the way, will delete out of band. @@ -6979,6 +6919,11 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { cb := fs.scb fs.mu.Unlock() + // Force a new index.db to be written. + if purged > 0 { + fs.forceWriteFullState() + } + if cb != nil { cb(-int64(purged), -rbytes, 0, _EMPTY_) } @@ -7173,11 +7118,19 @@ SKIP: } fs.state.Bytes -= bytes + // Any existing state file no longer applicable. We will force write a new one + // after we release the lock. + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() + // Force a new index.db to be written. + if purged > 0 { + fs.forceWriteFullState() + } + if cb != nil && purged > 0 { cb(-int64(purged), -int64(bytes), 0, _EMPTY_) } @@ -7238,6 +7191,40 @@ func (fs *fileStore) reset() error { return nil } +// Return all active tombstones in this msgBlock. +// Write lock should be held. +func (mb *msgBlock) tombs() []msgId { + var tombs []msgId + + if !mb.cacheAlreadyLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + return nil + } + } + + var le = binary.LittleEndian + buf := mb.cache.buf + + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { + if index+msgHdrSize > lbuf { + return tombs + } + hdr := buf[index : index+msgHdrSize] + rl, seq := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]) + // Clear any headers bit that could be set. + rl &^= hbit + // Check for tombstones. + if seq&tbit != 0 { + ts := int64(le.Uint64(hdr[12:])) + tombs = append(tombs, msgId{seq &^ tbit, ts}) + } + // Advance to next record. + index += rl + } + + return tombs +} + // Truncate will truncate a stream store up to seq. Sequence needs to be valid. func (fs *fileStore) Truncate(seq uint64) error { // Check for request to reset. @@ -7273,6 +7260,10 @@ func (fs *fileStore) Truncate(seq uint64) error { fs.mu.Unlock() return err } + // Collect all tombstones, we want to put these back so we can survive + // a restore without index.db properly. + var tombs []msgId + tombs = append(tombs, nlmb.tombs()...) var purged, bytes uint64 @@ -7290,6 +7281,8 @@ func (fs *fileStore) Truncate(seq uint64) error { getLastMsgBlock := func() *msgBlock { return fs.blks[len(fs.blks)-1] } for mb := getLastMsgBlock(); mb != nlmb; mb = getLastMsgBlock() { mb.mu.Lock() + // We do this to load tombs. + tombs = append(tombs, mb.tombs()...) purged += mb.msgs bytes += mb.bytes fs.removeMsgBlock(mb) @@ -7312,11 +7305,29 @@ func (fs *fileStore) Truncate(seq uint64) error { // Reset our subject lookup info. fs.resetGlobalPerSubjectInfo() + // Always create new write block. + fs.newMsgBlockForWrite() + + // Write any tombstones as needed. + for _, tomb := range tombs { + if tomb.seq <= lsm.seq { + fs.lmb.writeTombstone(tomb.seq, tomb.ts) + } + } + + // Any existing state file no longer applicable. We will force write a new one + // after we release the lock. + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() + // Force a new index.db to be written. + if purged > 0 { + fs.forceWriteFullState() + } + if cb != nil { cb(-int64(purged), -int64(bytes), 0, _EMPTY_) } @@ -8138,26 +8149,6 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) { msgPre := msgDir + "/" var bbuf []byte - const minLen = 32 - sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) - if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen { - if fs.aek != nil { - ns := fs.aek.NonceSize() - buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil) - if err == nil { - // Redo hash checksum at end on plaintext. - fs.mu.Lock() - hh.Reset() - hh.Write(buf) - buf = fs.hh.Sum(buf) - fs.mu.Unlock() - } - } - if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil { - return - } - } - // Now do messages themselves. for _, mb := range blks { if mb.pendingWriteSize() > 0 { @@ -8196,6 +8187,30 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) { } } + // Do index.db last. We will force a write as well. + // Write out full state as well before proceeding. + if err := fs.forceWriteFullState(); err == nil { + const minLen = 32 + sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) + if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen { + if fs.aek != nil { + ns := fs.aek.NonceSize() + buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil) + if err == nil { + // Redo hash checksum at end on plaintext. + fs.mu.Lock() + hh.Reset() + hh.Write(buf) + buf = fs.hh.Sum(buf) + fs.mu.Unlock() + } + } + if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil { + return + } + } + } + // Bail if no consumers requested. if !includeConsumers { return @@ -8268,9 +8283,6 @@ func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumer } } - // Write out full state as well before proceeding. - fs.writeFullState() - pr, pw := net.Pipe() // Set a write deadline here to protect ourselves. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go index 4ace6731ce..4c56b79775 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go @@ -105,6 +105,7 @@ type jetStream struct { storeReserved int64 memUsed int64 storeUsed int64 + queueLimit int64 clustered int32 mu sync.RWMutex srv *Server @@ -377,6 +378,9 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { } s.gcbMu.Unlock() + // TODO: Not currently reloadable. + atomic.StoreInt64(&js.queueLimit, s.getOpts().JetStreamRequestQueueLimit) + s.js.Store(js) // FIXME(dlc) - Allow memory only operation? diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go index 479babf81c..7bcc7d37eb 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go @@ -299,6 +299,9 @@ const ( // JSAdvisoryServerRemoved notification that a server has been removed from the system. JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED" + // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit. + JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED" + // JSAuditAdvisory is a notification about JetStream API access. // FIXME - Add in details about who.. JSAuditAdvisory = "$JS.EVENT.ADVISORY.API" @@ -346,6 +349,10 @@ const JSMaxMetadataLen = 128 * 1024 // Picked 255 as it seems to be a widely used file name limit const JSMaxNameLen = 255 +// JSDefaultRequestQueueLimit is the default number of entries that we will +// put on the global request queue before we react. +const JSDefaultRequestQueueLimit = 10_000 + // Responses for API calls. // ApiResponse is a standard response from the JetStream JSON API @@ -825,10 +832,22 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // Copy the state. Note the JSAPI only uses the hdr index to piece apart the // header from the msg body. No other references are needed. // Check pending and warn if getting backed up. - const warnThresh = 128 pending := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) - if pending >= warnThresh { - s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending) + limit := atomic.LoadInt64(&js.queueLimit) + if pending >= int(limit) { + s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending) + s.jsAPIRoutedReqs.drain() + + s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ + TypedEvent: TypedEvent{ + Type: JSAPILimitReachedAdvisoryType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Server: s.Name(), + Domain: js.config.Domain, + Dropped: int64(pending), + }) } } @@ -2745,6 +2764,12 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun return } + // This should only be coming from the System Account. + if acc != s.SystemAccount() { + s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User) + return + } + js, cc := s.getJetStreamCluster() if js == nil || cc == nil || cc.meta == nil { return diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 18a67c32e9..4c9e0931ae 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -583,10 +583,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum return false } s := js.srv - js.mu.RUnlock() - // Capture RAFT node from assignment. node := ca.Group.node + js.mu.RUnlock() // When we try to restart we nil out the node if applicable // and reprocess the consumer assignment. @@ -855,7 +854,6 @@ func (js *jetStream) setupMetaGroup() error { } c := s.createInternalJetStreamClient() - sacc := s.SystemAccount() js.mu.Lock() defer js.mu.Unlock() @@ -867,7 +865,7 @@ func (js *jetStream) setupMetaGroup() error { qch: make(chan struct{}), } atomic.StoreInt32(&js.clustered, 1) - c.registerWithAccount(sacc) + c.registerWithAccount(sysAcc) // Set to true before we start. js.metaRecovering = true @@ -875,7 +873,7 @@ func (js *jetStream) setupMetaGroup() error { js.monitorCluster, pprofLabels{ "type": "metaleader", - "account": sacc.Name, + "account": sysAcc.Name, }, ) return nil @@ -1039,7 +1037,7 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool { ourID := cc.meta.ID() for _, peer := range rg.Peers { if peer == ourID { - if len(rg.Peers) == 1 || rg.node != nil && rg.node.Leader() { + if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) { return true } } @@ -1201,7 +1199,12 @@ func (js *jetStream) checkForOrphans() { stream = mset.cfg.Name mset.mu.RUnlock() } - s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + if o.isDurable() { + s.Warnf("Detected orphaned durable consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + } else { + s.Debugf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + } + if err := o.delete(); err != nil { s.Warnf("Deleting consumer encountered an error: %v", err) } @@ -3792,7 +3795,8 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme } mset.setStreamAssignment(sa) // Check if our config has really been updated. - if !reflect.DeepEqual(mset.config(), sa.Config) { + cfg := mset.config() + if !reflect.DeepEqual(&cfg, sa.Config) { if err = mset.updateWithAdvisory(sa.Config, false); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) if osa != nil { @@ -6331,8 +6335,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } if isReplicaChange { + isScaleUp := newCfg.Replicas > len(rg.Peers) // We are adding new peers here. - if newCfg.Replicas > len(rg.Peers) { + if isScaleUp { // Check that we have the allocation available. if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil { resp.Error = err @@ -6408,22 +6413,82 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Need to remap any consumers. for _, ca := range osa.consumers { - // Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy. + // Legacy ephemerals are R=1 but present as R=0, so only auto-remap named consumers, or if we are downsizing the consumer peers. + // If stream is interest or workqueue policy always remaps since they require peer parity with stream. numPeers := len(ca.Group.Peers) - if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy { + isAutoScale := ca.Config.Replicas == 0 && (ca.Config.Durable != _EMPTY_ || ca.Config.Name != _EMPTY_) + if isAutoScale || numPeers > len(rg.Peers) || cfg.Retention != LimitsPolicy { cca := ca.copyGroup() // Adjust preferred as needed. - if numPeers == 1 && len(rg.Peers) > 1 { + if numPeers == 1 && isScaleUp { cca.Group.Preferred = ca.Group.Peers[0] } else { cca.Group.Preferred = _EMPTY_ } // Assign new peers. cca.Group.Peers = rg.Peers + // If the replicas was not 0 make sure it matches here. + if cca.Config.Replicas != 0 { + cca.Config.Replicas = len(rg.Peers) + } // We can not propose here before the stream itself so we collect them. consumers = append(consumers, cca) + + } else if !isScaleUp { + // We decided to leave this consumer's peer group alone but we are also scaling down. + // We need to make sure we do not have any peers that are no longer part of the stream. + // Note we handle down scaling of a consumer above if its number of peers were > new stream peers. + var needReplace []string + for _, rp := range ca.Group.Peers { + // Check if we have an orphaned peer now for this consumer. + if !rg.isMember(rp) { + needReplace = append(needReplace, rp) + } + } + if len(needReplace) > 0 { + newPeers := copyStrings(rg.Peers) + rand.Shuffle(len(newPeers), func(i, j int) { newPeers[i], newPeers[j] = newPeers[j], newPeers[i] }) + // If we had a small size then the peer set, restrict to the same number. + if lp := len(ca.Group.Peers); lp < len(newPeers) { + newPeers = newPeers[:lp] + } + cca := ca.copyGroup() + // Assign new peers. + cca.Group.Peers = newPeers + // If the replicas was not 0 make sure it matches here. + if cca.Config.Replicas != 0 { + cca.Config.Replicas = len(newPeers) + } + // Check if all peers are invalid. This can happen with R1 under replicated streams that are being scaled down. + if len(needReplace) == len(ca.Group.Peers) { + // We have to transfer state to new peers. + // we will grab our state and attach to the new assignment. + // TODO(dlc) - In practice we would want to make sure the consumer is paused. + // Need to release js lock. + js.mu.Unlock() + if ci, err := sysRequest[ConsumerInfo](s, clusterConsumerInfoT, acc, osa.Config.Name, ca.Name); err != nil { + s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, osa.Config.Name, ca.Name, err) + } else if ci != nil { + cca.State = &ConsumerState{ + Delivered: SequencePair{ + Consumer: ci.Delivered.Consumer, + Stream: ci.Delivered.Stream, + }, + AckFloor: SequencePair{ + Consumer: ci.AckFloor.Consumer, + Stream: ci.AckFloor.Stream, + }, + } + } + // Re-acquire here. + js.mu.Lock() + } + // We can not propose here before the stream itself so we collect them. + consumers = append(consumers, cca) + } } } + } else if isMoveRequest { if len(peerSet) == 0 { nrg, err := js.createGroupForStream(ci, newCfg) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go index 1852811bb9..8302fcc404 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go @@ -283,3 +283,14 @@ type JSServerRemovedAdvisory struct { Cluster string `json:"cluster"` Domain string `json:"domain,omitempty"` } + +// JSAPILimitReachedAdvisoryType is sent when the JS API request queue limit is reached. +const JSAPILimitReachedAdvisoryType = "io.nats.jetstream.advisory.v1.api_limit_reached" + +// JSAPILimitReachedAdvisory is a advisory published when JetStream hits the queue length limit. +type JSAPILimitReachedAdvisory struct { + TypedEvent + Server string `json:"server"` // Server that created the event, name or ID + Domain string `json:"domain,omitempty"` // Domain the server belongs to + Dropped int64 `json:"dropped"` // How many messages did we drop from the queue +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go index 46e262e515..2bd25f9a7b 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go @@ -1462,6 +1462,9 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) { if ci.Leader == s.info.Name { v.Meta.Replicas = ci.Replicas } + if ipq := s.jsAPIRoutedReqs; ipq != nil { + v.Meta.Pending = ipq.len() + } } } } @@ -2791,6 +2794,7 @@ type MetaClusterInfo struct { Peer string `json:"peer,omitempty"` Replicas []*PeerInfo `json:"replicas,omitempty"` Size int `json:"cluster_size"` + Pending int `json:"pending"` } // JSInfo has detailed information on JetStream. @@ -2990,6 +2994,9 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { if isLeader { jsi.Meta.Replicas = ci.Replicas } + if ipq := s.jsAPIRoutedReqs; ipq != nil { + jsi.Meta.Pending = ipq.len() + } } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/opts.go b/vendor/github.com/nats-io/nats-server/v2/server/opts.go index ac3988235f..0b4ed483dc 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/opts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/opts.go @@ -251,86 +251,87 @@ type AuthCallout struct { // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. type Options struct { - ConfigFile string `json:"-"` - ServerName string `json:"server_name"` - Host string `json:"addr"` - Port int `json:"port"` - DontListen bool `json:"dont_listen"` - ClientAdvertise string `json:"-"` - Trace bool `json:"-"` - Debug bool `json:"-"` - TraceVerbose bool `json:"-"` - NoLog bool `json:"-"` - NoSigs bool `json:"-"` - NoSublistCache bool `json:"-"` - NoHeaderSupport bool `json:"-"` - DisableShortFirstPing bool `json:"-"` - Logtime bool `json:"-"` - LogtimeUTC bool `json:"-"` - MaxConn int `json:"max_connections"` - MaxSubs int `json:"max_subscriptions,omitempty"` - MaxSubTokens uint8 `json:"-"` - Nkeys []*NkeyUser `json:"-"` - Users []*User `json:"-"` - Accounts []*Account `json:"-"` - NoAuthUser string `json:"-"` - SystemAccount string `json:"-"` - NoSystemAccount bool `json:"-"` - Username string `json:"-"` - Password string `json:"-"` - Authorization string `json:"-"` - AuthCallout *AuthCallout `json:"-"` - PingInterval time.Duration `json:"ping_interval"` - MaxPingsOut int `json:"ping_max"` - HTTPHost string `json:"http_host"` - HTTPPort int `json:"http_port"` - HTTPBasePath string `json:"http_base_path"` - HTTPSPort int `json:"https_port"` - AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int32 `json:"max_control_line"` - MaxPayload int32 `json:"max_payload"` - MaxPending int64 `json:"max_pending"` - Cluster ClusterOpts `json:"cluster,omitempty"` - Gateway GatewayOpts `json:"gateway,omitempty"` - LeafNode LeafNodeOpts `json:"leaf,omitempty"` - JetStream bool `json:"jetstream"` - JetStreamMaxMemory int64 `json:"-"` - JetStreamMaxStore int64 `json:"-"` - JetStreamDomain string `json:"-"` - JetStreamExtHint string `json:"-"` - JetStreamKey string `json:"-"` - JetStreamOldKey string `json:"-"` - JetStreamCipher StoreCipher `json:"-"` - JetStreamUniqueTag string - JetStreamLimits JSLimitOpts - JetStreamMaxCatchup int64 - StoreDir string `json:"-"` - SyncInterval time.Duration `json:"-"` - SyncAlways bool `json:"-"` - JsAccDefaultDomain map[string]string `json:"-"` // account to domain name mapping - Websocket WebsocketOpts `json:"-"` - MQTT MQTTOpts `json:"-"` - ProfPort int `json:"-"` - ProfBlockRate int `json:"-"` - PidFile string `json:"-"` - PortsFileDir string `json:"-"` - LogFile string `json:"-"` - LogSizeLimit int64 `json:"-"` - LogMaxFiles int64 `json:"-"` - Syslog bool `json:"-"` - RemoteSyslog string `json:"-"` - Routes []*url.URL `json:"-"` - RoutesStr string `json:"-"` - TLSTimeout float64 `json:"tls_timeout"` - TLS bool `json:"-"` - TLSVerify bool `json:"-"` - TLSMap bool `json:"-"` - TLSCert string `json:"-"` - TLSKey string `json:"-"` - TLSCaCert string `json:"-"` - TLSConfig *tls.Config `json:"-"` - TLSPinnedCerts PinnedCertSet `json:"-"` - TLSRateLimit int64 `json:"-"` + ConfigFile string `json:"-"` + ServerName string `json:"server_name"` + Host string `json:"addr"` + Port int `json:"port"` + DontListen bool `json:"dont_listen"` + ClientAdvertise string `json:"-"` + Trace bool `json:"-"` + Debug bool `json:"-"` + TraceVerbose bool `json:"-"` + NoLog bool `json:"-"` + NoSigs bool `json:"-"` + NoSublistCache bool `json:"-"` + NoHeaderSupport bool `json:"-"` + DisableShortFirstPing bool `json:"-"` + Logtime bool `json:"-"` + LogtimeUTC bool `json:"-"` + MaxConn int `json:"max_connections"` + MaxSubs int `json:"max_subscriptions,omitempty"` + MaxSubTokens uint8 `json:"-"` + Nkeys []*NkeyUser `json:"-"` + Users []*User `json:"-"` + Accounts []*Account `json:"-"` + NoAuthUser string `json:"-"` + SystemAccount string `json:"-"` + NoSystemAccount bool `json:"-"` + Username string `json:"-"` + Password string `json:"-"` + Authorization string `json:"-"` + AuthCallout *AuthCallout `json:"-"` + PingInterval time.Duration `json:"ping_interval"` + MaxPingsOut int `json:"ping_max"` + HTTPHost string `json:"http_host"` + HTTPPort int `json:"http_port"` + HTTPBasePath string `json:"http_base_path"` + HTTPSPort int `json:"https_port"` + AuthTimeout float64 `json:"auth_timeout"` + MaxControlLine int32 `json:"max_control_line"` + MaxPayload int32 `json:"max_payload"` + MaxPending int64 `json:"max_pending"` + Cluster ClusterOpts `json:"cluster,omitempty"` + Gateway GatewayOpts `json:"gateway,omitempty"` + LeafNode LeafNodeOpts `json:"leaf,omitempty"` + JetStream bool `json:"jetstream"` + JetStreamMaxMemory int64 `json:"-"` + JetStreamMaxStore int64 `json:"-"` + JetStreamDomain string `json:"-"` + JetStreamExtHint string `json:"-"` + JetStreamKey string `json:"-"` + JetStreamOldKey string `json:"-"` + JetStreamCipher StoreCipher `json:"-"` + JetStreamUniqueTag string + JetStreamLimits JSLimitOpts + JetStreamMaxCatchup int64 + JetStreamRequestQueueLimit int64 + StoreDir string `json:"-"` + SyncInterval time.Duration `json:"-"` + SyncAlways bool `json:"-"` + JsAccDefaultDomain map[string]string `json:"-"` // account to domain name mapping + Websocket WebsocketOpts `json:"-"` + MQTT MQTTOpts `json:"-"` + ProfPort int `json:"-"` + ProfBlockRate int `json:"-"` + PidFile string `json:"-"` + PortsFileDir string `json:"-"` + LogFile string `json:"-"` + LogSizeLimit int64 `json:"-"` + LogMaxFiles int64 `json:"-"` + Syslog bool `json:"-"` + RemoteSyslog string `json:"-"` + Routes []*url.URL `json:"-"` + RoutesStr string `json:"-"` + TLSTimeout float64 `json:"tls_timeout"` + TLS bool `json:"-"` + TLSVerify bool `json:"-"` + TLSMap bool `json:"-"` + TLSCert string `json:"-"` + TLSKey string `json:"-"` + TLSCaCert string `json:"-"` + TLSConfig *tls.Config `json:"-"` + TLSPinnedCerts PinnedCertSet `json:"-"` + TLSRateLimit int64 `json:"-"` // When set to true, the server will perform the TLS handshake before // sending the INFO protocol. For clients that are not configured // with a similar option, their connection will fail with some sort @@ -675,6 +676,7 @@ type TLSConfigOpts struct { CertMatch string OCSPPeerConfig *certidp.OCSPPeerConfig Certificates []*TLSCertPairOpt + MinVersion uint16 } // TLSCertPairOpt are the paths to a certificate and private key. @@ -1195,6 +1197,22 @@ func (o *Options) processConfigFileLine(k string, v any, errors *[]error, warnin opFiles = append(opFiles, v) case []string: opFiles = append(opFiles, v...) + case []any: + for _, t := range v { + if token, ok := t.(token); ok { + if v, ok := token.Value().(string); ok { + opFiles = append(opFiles, v) + } else { + err := &configErr{tk, fmt.Sprintf("error parsing operators: unsupported type %T where string is expected", token)} + *errors = append(*errors, err) + break + } + } else { + err := &configErr{tk, fmt.Sprintf("error parsing operators: unsupported type %T", t)} + *errors = append(*errors, err) + break + } + } default: err := &configErr{tk, fmt.Sprintf("error parsing operators: unsupported type %T", v)} *errors = append(*errors, err) @@ -2218,6 +2236,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)} } opts.JetStreamMaxCatchup = s + case "request_queue_limit": + lim, ok := mv.(int64) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} + } + opts.JetStreamRequestQueueLimit = lim default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -4214,6 +4238,24 @@ func parseCurvePreferences(curveName string) (tls.CurveID, error) { return curve, nil } +func parseTLSVersion(v any) (uint16, error) { + var tlsVersionNumber uint16 + switch v := v.(type) { + case string: + n, err := tlsVersionFromString(v) + if err != nil { + return 0, err + } + tlsVersionNumber = n + default: + return 0, fmt.Errorf("'min_version' wrong type: %v", v) + } + if tlsVersionNumber < tls.VersionTLS12 { + return 0, fmt.Errorf("unsupported TLS version: %s", tls.VersionName(tlsVersionNumber)) + } + return tlsVersionNumber, nil +} + // Helper function to parse TLS configs. func parseTLS(v any, isClientCtx bool) (t *TLSConfigOpts, retErr error) { var ( @@ -4457,6 +4499,12 @@ func parseTLS(v any, isClientCtx bool) (t *TLSConfigOpts, retErr error) { } tc.Certificates[i] = certPair } + case "min_version": + minVersion, err := parseTLSVersion(mv) + if err != nil { + return nil, &configErr{tk, fmt.Sprintf("error parsing tls config: %v", err)} + } + tc.MinVersion = minVersion default: return nil, &configErr{tk, fmt.Sprintf("error parsing tls config, unknown field %q", mk)} } @@ -4808,6 +4856,13 @@ func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error) { } config.ClientCAs = pool } + // Allow setting TLS minimum version. + if tc.MinVersion > 0 { + if tc.MinVersion < tls.VersionTLS12 { + return nil, fmt.Errorf("unsupported minimum TLS version: %s", tls.VersionName(tc.MinVersion)) + } + config.MinVersion = tc.MinVersion + } return &config, nil } @@ -5177,6 +5232,9 @@ func setBaselineOptions(opts *Options) { if opts.SyncInterval == 0 && !opts.syncSet { opts.SyncInterval = defaultSyncInterval } + if opts.JetStreamRequestQueueLimit <= 0 { + opts.JetStreamRequestQueueLimit = JSDefaultRequestQueueLimit + } } func getDefaultAuthTimeout(tls *tls.Config, tlsTimeout float64) float64 { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/reload.go b/vendor/github.com/nats-io/nats-server/v2/server/reload.go index 15bbae1e38..347fcfd8b7 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/reload.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/reload.go @@ -1150,7 +1150,7 @@ func imposeOrder(value any) error { slices.SortFunc(value.Gateways, func(i, j *RemoteGatewayOpts) int { return cmp.Compare(i.Name, j.Name) }) case WebsocketOpts: slices.Sort(value.AllowedOrigins) - case string, bool, uint8, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, + case string, bool, uint8, uint16, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, *URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList, *OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig: // explicitly skipped types diff --git a/vendor/github.com/nats-io/nats-server/v2/server/server.go b/vendor/github.com/nats-io/nats-server/v2/server/server.go index 4907bd5b85..49388d3008 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/server.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/server.go @@ -1705,7 +1705,7 @@ func (s *Server) setSystemAccount(acc *Account) error { recvqp: newIPQueue[*inSysMsg](s, "System recvQ Pings"), resetCh: make(chan struct{}), sq: s.newSendQ(), - statsz: eventsHBInterval, + statsz: statsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval, } @@ -1990,9 +1990,9 @@ func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error accClaims, _, err := s.verifyAccountClaims(claimJWT) if err == nil && accClaims != nil { acc.mu.Lock() - if acc.Issuer == _EMPTY_ { - acc.Issuer = accClaims.Issuer - } + // if an account is updated with a different operator signing key, we want to + // show a consistent issuer. + acc.Issuer = accClaims.Issuer if acc.Name != accClaims.Subject { acc.mu.Unlock() return ErrAccountValidation @@ -2926,8 +2926,10 @@ func (s *Server) startMonitoring(secure bool) error { } hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port)) config := opts.TLSConfig.Clone() - config.GetConfigForClient = s.getMonitoringTLSConfig - config.ClientAuth = tls.NoClientCert + if !s.ocspPeerVerify { + config.GetConfigForClient = s.getMonitoringTLSConfig + config.ClientAuth = tls.NoClientCert + } httpListener, err = tls.Listen("tcp", hp, config) } else { @@ -3441,6 +3443,20 @@ func tlsVersion(ver uint16) string { return fmt.Sprintf("Unknown [0x%x]", ver) } +func tlsVersionFromString(ver string) (uint16, error) { + switch ver { + case "1.0": + return tls.VersionTLS10, nil + case "1.1": + return tls.VersionTLS11, nil + case "1.2": + return tls.VersionTLS12, nil + case "1.3": + return tls.VersionTLS13, nil + } + return 0, fmt.Errorf("unknown version: %v", ver) +} + // We use hex here so we don't need multiple versions func tlsCipher(cs uint16) string { name, present := cipherMapByID[cs] diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index fefbc1f11d..428891cf9d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -650,9 +650,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt mset.store.FastState(&state) // Possible race with consumer.setLeader during recovery. - mset.mu.RLock() + mset.mu.Lock() mset.lseq = state.LastSeq - mset.mu.RUnlock() + mset.mu.Unlock() // If no msgs (new stream), set dedupe state loaded to true. if state.Msgs == 0 { diff --git a/vendor/modules.txt b/vendor/modules.txt index 7886aefb9b..308a4aece9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1431,7 +1431,7 @@ github.com/munnerz/goautoneg # github.com/nats-io/jwt/v2 v2.5.8 ## explicit; go 1.18 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.10.20 +# github.com/nats-io/nats-server/v2 v2.10.21 ## explicit; go 1.21.0 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/fastrand