Merge pull request #10277 from owncloud/dependabot/go_modules/github.com/nats-io/nats-server/v2-2.10.21

chore(deps): bump github.com/nats-io/nats-server/v2 from 2.10.20 to 2.10.21
This commit is contained in:
Michael Barz
2024-10-10 10:20:39 +02:00
committed by GitHub
16 changed files with 451 additions and 246 deletions

2
go.mod
View File

@@ -62,7 +62,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.20
github.com/nats-io/nats-server/v2 v2.10.21
github.com/nats-io/nats.go v1.37.0
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5

4
go.sum
View File

@@ -893,8 +893,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.20 h1:CXDTYNHeBiAKBTAIP2gjpgbWap2GhATnTLgP8etyvEI=
github.com/nats-io/nats-server/v2 v2.10.20/go.mod h1:hgcPnoUtMfxz1qVOvLZGurVypQ+Cg6GXVXjG53iHk+M=
github.com/nats-io/nats-server/v2 v2.10.21 h1:gfG6T06wBdI25XyY2IsauarOc2srWoFxxfsOKjrzoRA=
github.com/nats-io/nats-server/v2 v2.10.21/go.mod h1:I1YxSAEWbXCfy0bthwvNb5X43WwIWMz7gx5ZVPDr5Rc=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=

View File

@@ -872,7 +872,10 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
c.Debugf("Account JWT lookup error: %v", err)
return false
}
if !s.isTrustedIssuer(acc.Issuer) {
acc.mu.RLock()
aissuer := acc.Issuer
acc.mu.RUnlock()
if !s.isTrustedIssuer(aissuer) {
c.Debugf("Account JWT not signed by trusted operator")
return false
}

View File

@@ -55,7 +55,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.10.20"
VERSION = "2.10.21"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -97,6 +97,7 @@ const (
// FIXME(dlc) - make configurable.
var eventsHBInterval = 30 * time.Second
var statsHBInterval = 10 * time.Second
// Default minimum wait time for sending statsz
const defaultStatszRateLimit = 1 * time.Second
@@ -944,6 +945,9 @@ func (s *Server) sendStatsz(subj string) {
Size: mg.ClusterSize(),
}
}
if ipq := s.jsAPIRoutedReqs; ipq != nil && jStat.Meta != nil {
jStat.Meta.Pending = ipq.len()
}
}
m.Stats.JetStream = jStat
s.mu.RLock()

View File

@@ -436,6 +436,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Attempt to recover our state.
err = fs.recoverFullState()
if err != nil {
if !os.IsNotExist(err) {
fs.warn("Recovering stream state from index errored: %v", err)
}
// Hold onto state
prior := fs.state
// Reset anything that could have been set from above.
@@ -469,7 +472,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
go fs.cleanupOldMeta()
}()
// Lock while do enforcements and removals.
// Lock while we do enforcements and removals.
fs.mu.Lock()
// Check if we have any left over tombstones to process.
@@ -975,7 +978,6 @@ func (mb *msgBlock) ensureLastChecksumLoaded() {
// Lock held on entry
func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) {
mb := fs.initMsgBlock(index)
// Open up the message file, but we will try to recover from the index file.
// We will check that the last checksums match.
file, err := mb.openBlock()
@@ -1357,6 +1359,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
minTombstoneTs int64
)
// To detect gaps from compaction.
var last uint64
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
truncate(index)
@@ -1444,8 +1449,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
mb.bytes += uint64(rl)
}
// Check for any gaps from compaction, meaning no ebit entry.
if last > 0 && seq != last+1 {
for dseq := last + 1; dseq < seq; dseq++ {
addToDmap(dseq)
}
}
// Always set last
atomic.StoreUint64(&mb.last.seq, seq)
last = seq
atomic.StoreUint64(&mb.last.seq, last)
mb.last.ts = ts
// Advance to next record.
@@ -1665,7 +1678,8 @@ func (fs *fileStore) recoverFullState() (rerr error) {
for i := 0; i < int(numBlocks); i++ {
index, nbytes, fseq, fts, lseq, lts, numDeleted := uint32(readU64()), readU64(), readU64(), readI64(), readU64(), readI64(), readU64()
if bi < 0 {
break
os.Remove(fn)
return errCorruptState
}
mb := fs.initMsgBlock(index)
atomic.StoreUint64(&mb.first.seq, fseq)
@@ -1734,120 +1748,33 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since recover will add in the new one.
fs.removeMsgBlockFromList(mb)
// Reverse update of tracking state for this mb, will add new state in below.
mstate.Msgs -= mb.msgs
mstate.Bytes -= mb.bytes
if nmb, err := fs.recoverMsgBlock(mb.index); err != nil && !os.IsNotExist(err) {
fs.warn("Stream state could not recover last msg block")
os.Remove(fn)
return errCorruptState
} else if nmb != nil {
fs.adjustAccounting(mb, nmb)
updateTrackingState(&mstate, nmb)
}
// Detected a stale index.db, we didn't write it upon shutdown so can't rely on it being correct.
fs.warn("Stream state outdated, last block has additional entries, will rebuild")
return errPriorState
}
// On success double check our state.
checkState := func() error {
// We check first and last seq and number of msgs and bytes. If there is a difference,
// return and error so we rebuild from the message block state on disk.
if !trackingStatesEqual(&fs.state, &mstate) {
fs.warn("Stream state encountered internal inconsistency on recover")
os.Remove(fn)
return errCorruptState
}
return nil
// We need to see if any blocks exist after our last one even though we matched the last record exactly.
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
var dirs []os.DirEntry
<-dios
if f, err := os.Open(mdir); err == nil {
dirs, _ = f.ReadDir(-1)
f.Close()
}
dios <- struct{}{}
// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
for bi := blkIndex + 1; ; bi++ {
nmb, err := fs.recoverMsgBlock(bi)
if err != nil {
if os.IsNotExist(err) {
return checkState()
}
os.Remove(fn)
fs.warn("Stream state could not recover msg block %d", bi)
return err
}
if nmb != nil {
// Update top level accounting
if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq {
fs.state.FirstSeq = fseq
fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC()
}
if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq {
fs.state.LastSeq = lseq
fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC()
}
fs.state.Msgs += nmb.msgs
fs.state.Bytes += nmb.bytes
updateTrackingState(&mstate, nmb)
}
}
}
// adjustAccounting will be called when a stream state was only partially accounted for
// within a message block, e.g. additional records were added after the stream state.
// Lock should be held.
func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) {
nmb.mu.Lock()
defer nmb.mu.Unlock()
// First make sure the new block is loaded.
if nmb.cacheNotLoaded() {
nmb.loadMsgsWithLock()
}
nmb.ensurePerSubjectInfoLoaded()
var smv StoreMsg
// Need to walk previous messages and undo psim stats.
// We already undid msgs and bytes accounting.
for seq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ {
// Lookup the message. If an error will be deleted, so can skip.
sm, err := nmb.cacheLookup(seq, &smv)
if err != nil {
continue
}
if len(sm.subj) > 0 && fs.psim != nil {
if info, ok := fs.psim.Find(stringToBytes(sm.subj)); ok {
info.total--
var index uint32
for _, fi := range dirs {
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
if index > blkIndex {
fs.warn("Stream state outdated, found extra blocks, will rebuild")
return errPriorState
}
}
}
// Walk only new messages and update accounting at fs level. Any messages that should have
// triggered limits exceeded will be handled after the recovery and prior to the stream
// being available to the system.
for seq, lseq := atomic.LoadUint64(&mb.last.seq)+1, atomic.LoadUint64(&nmb.last.seq); seq <= lseq; seq++ {
// Lookup the message. If an error will be deleted, so can skip.
sm, err := nmb.cacheLookup(seq, &smv)
if err != nil {
continue
}
// Since we found it we just need to adjust fs totals and psim.
fs.state.Msgs++
fs.state.Bytes += fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
}
// Now check to see if we had a higher first for the recovered state mb vs nmb.
if atomic.LoadUint64(&nmb.first.seq) < atomic.LoadUint64(&mb.first.seq) {
// Now set first for nmb.
atomic.StoreUint64(&nmb.first.seq, atomic.LoadUint64(&mb.first.seq))
}
// Update top level accounting.
if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq {
fs.state.FirstSeq = fseq
fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC()
}
if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq {
fs.state.LastSeq = lseq
fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC()
}
return nil
}
// Grabs last checksum for the named block file.
@@ -5302,7 +5229,8 @@ func (mb *msgBlock) ensureRawBytesLoaded() error {
// Sync msg and index files as needed. This is called from a timer.
func (fs *fileStore) syncBlocks() {
fs.mu.RLock()
if fs.closed {
// If closed or a snapshot is in progress bail.
if fs.closed || fs.sips > 0 {
fs.mu.RUnlock()
return
}
@@ -6786,6 +6714,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
var smv StoreMsg
var tombs []msgId
fs.mu.Lock()
// We may remove blocks as we purge, so don't range directly on fs.blks
@@ -6839,9 +6768,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
purged++
bytes += rl
}
// FSS updates.
// PSIM and FSS updates.
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
// Track tombstones we need to write.
tombs = append(tombs, msgId{sm.seq, sm.ts})
// Check for first message.
if seq == atomic.LoadUint64(&mb.first.seq) {
@@ -6880,7 +6811,16 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
if firstSeqNeedsUpdate {
fs.selectNextFirst()
}
fseq := fs.state.FirstSeq
// Write any tombstones as needed.
for _, tomb := range tombs {
if tomb.seq > fseq {
fs.lmb.writeTombstone(tomb.seq, tomb.ts)
}
}
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++
cb := fs.scb
fs.mu.Unlock()
@@ -6923,7 +6863,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
fs.bim = make(map[uint32]*msgBlock)
// Clear any per subject tracking.
fs.psim, fs.tsl = fs.psim.Empty(), 0
// Mark dirty
// Mark dirty.
fs.dirty++
// Move the msgs directory out of the way, will delete out of band.
@@ -6979,6 +6919,11 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
cb := fs.scb
fs.mu.Unlock()
// Force a new index.db to be written.
if purged > 0 {
fs.forceWriteFullState()
}
if cb != nil {
cb(-int64(purged), -rbytes, 0, _EMPTY_)
}
@@ -7173,11 +7118,19 @@ SKIP:
}
fs.state.Bytes -= bytes
// Any existing state file no longer applicable. We will force write a new one
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++
cb := fs.scb
fs.mu.Unlock()
// Force a new index.db to be written.
if purged > 0 {
fs.forceWriteFullState()
}
if cb != nil && purged > 0 {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
@@ -7238,6 +7191,40 @@ func (fs *fileStore) reset() error {
return nil
}
// Return all active tombstones in this msgBlock.
// Write lock should be held.
func (mb *msgBlock) tombs() []msgId {
var tombs []msgId
if !mb.cacheAlreadyLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil
}
}
var le = binary.LittleEndian
buf := mb.cache.buf
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
return tombs
}
hdr := buf[index : index+msgHdrSize]
rl, seq := le.Uint32(hdr[0:]), le.Uint64(hdr[4:])
// Clear any headers bit that could be set.
rl &^= hbit
// Check for tombstones.
if seq&tbit != 0 {
ts := int64(le.Uint64(hdr[12:]))
tombs = append(tombs, msgId{seq &^ tbit, ts})
}
// Advance to next record.
index += rl
}
return tombs
}
// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
func (fs *fileStore) Truncate(seq uint64) error {
// Check for request to reset.
@@ -7273,6 +7260,10 @@ func (fs *fileStore) Truncate(seq uint64) error {
fs.mu.Unlock()
return err
}
// Collect all tombstones, we want to put these back so we can survive
// a restore without index.db properly.
var tombs []msgId
tombs = append(tombs, nlmb.tombs()...)
var purged, bytes uint64
@@ -7290,6 +7281,8 @@ func (fs *fileStore) Truncate(seq uint64) error {
getLastMsgBlock := func() *msgBlock { return fs.blks[len(fs.blks)-1] }
for mb := getLastMsgBlock(); mb != nlmb; mb = getLastMsgBlock() {
mb.mu.Lock()
// We do this to load tombs.
tombs = append(tombs, mb.tombs()...)
purged += mb.msgs
bytes += mb.bytes
fs.removeMsgBlock(mb)
@@ -7312,11 +7305,29 @@ func (fs *fileStore) Truncate(seq uint64) error {
// Reset our subject lookup info.
fs.resetGlobalPerSubjectInfo()
// Always create new write block.
fs.newMsgBlockForWrite()
// Write any tombstones as needed.
for _, tomb := range tombs {
if tomb.seq <= lsm.seq {
fs.lmb.writeTombstone(tomb.seq, tomb.ts)
}
}
// Any existing state file no longer applicable. We will force write a new one
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++
cb := fs.scb
fs.mu.Unlock()
// Force a new index.db to be written.
if purged > 0 {
fs.forceWriteFullState()
}
if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
@@ -8138,26 +8149,6 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
msgPre := msgDir + "/"
var bbuf []byte
const minLen = 32
sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen {
if fs.aek != nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil)
if err == nil {
// Redo hash checksum at end on plaintext.
fs.mu.Lock()
hh.Reset()
hh.Write(buf)
buf = fs.hh.Sum(buf)
fs.mu.Unlock()
}
}
if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil {
return
}
}
// Now do messages themselves.
for _, mb := range blks {
if mb.pendingWriteSize() > 0 {
@@ -8196,6 +8187,30 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
}
}
// Do index.db last. We will force a write as well.
// Write out full state as well before proceeding.
if err := fs.forceWriteFullState(); err == nil {
const minLen = 32
sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen {
if fs.aek != nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil)
if err == nil {
// Redo hash checksum at end on plaintext.
fs.mu.Lock()
hh.Reset()
hh.Write(buf)
buf = fs.hh.Sum(buf)
fs.mu.Unlock()
}
}
if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil {
return
}
}
}
// Bail if no consumers requested.
if !includeConsumers {
return
@@ -8268,9 +8283,6 @@ func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumer
}
}
// Write out full state as well before proceeding.
fs.writeFullState()
pr, pw := net.Pipe()
// Set a write deadline here to protect ourselves.

View File

@@ -105,6 +105,7 @@ type jetStream struct {
storeReserved int64
memUsed int64
storeUsed int64
queueLimit int64
clustered int32
mu sync.RWMutex
srv *Server
@@ -377,6 +378,9 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
}
s.gcbMu.Unlock()
// TODO: Not currently reloadable.
atomic.StoreInt64(&js.queueLimit, s.getOpts().JetStreamRequestQueueLimit)
s.js.Store(js)
// FIXME(dlc) - Allow memory only operation?

View File

@@ -299,6 +299,9 @@ const (
// JSAdvisoryServerRemoved notification that a server has been removed from the system.
JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
// JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
// JSAuditAdvisory is a notification about JetStream API access.
// FIXME - Add in details about who..
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
@@ -346,6 +349,10 @@ const JSMaxMetadataLen = 128 * 1024
// Picked 255 as it seems to be a widely used file name limit
const JSMaxNameLen = 255
// JSDefaultRequestQueueLimit is the default number of entries that we will
// put on the global request queue before we react.
const JSDefaultRequestQueueLimit = 10_000
// Responses for API calls.
// ApiResponse is a standard response from the JetStream JSON API
@@ -825,10 +832,22 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
const warnThresh = 128
pending := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if pending >= warnThresh {
s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending)
limit := atomic.LoadInt64(&js.queueLimit)
if pending >= int(limit) {
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
s.jsAPIRoutedReqs.drain()
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Type: JSAPILimitReachedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
Domain: js.config.Domain,
Dropped: int64(pending),
})
}
}
@@ -2745,6 +2764,12 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
return
}
// This should only be coming from the System Account.
if acc != s.SystemAccount() {
s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
return
}
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
return

View File

@@ -583,10 +583,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
return false
}
s := js.srv
js.mu.RUnlock()
// Capture RAFT node from assignment.
node := ca.Group.node
js.mu.RUnlock()
// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
@@ -855,7 +854,6 @@ func (js *jetStream) setupMetaGroup() error {
}
c := s.createInternalJetStreamClient()
sacc := s.SystemAccount()
js.mu.Lock()
defer js.mu.Unlock()
@@ -867,7 +865,7 @@ func (js *jetStream) setupMetaGroup() error {
qch: make(chan struct{}),
}
atomic.StoreInt32(&js.clustered, 1)
c.registerWithAccount(sacc)
c.registerWithAccount(sysAcc)
// Set to true before we start.
js.metaRecovering = true
@@ -875,7 +873,7 @@ func (js *jetStream) setupMetaGroup() error {
js.monitorCluster,
pprofLabels{
"type": "metaleader",
"account": sacc.Name,
"account": sysAcc.Name,
},
)
return nil
@@ -1039,7 +1037,7 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool {
ourID := cc.meta.ID()
for _, peer := range rg.Peers {
if peer == ourID {
if len(rg.Peers) == 1 || rg.node != nil && rg.node.Leader() {
if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) {
return true
}
}
@@ -1201,7 +1199,12 @@ func (js *jetStream) checkForOrphans() {
stream = mset.cfg.Name
mset.mu.RUnlock()
}
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
if o.isDurable() {
s.Warnf("Detected orphaned durable consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
} else {
s.Debugf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
}
if err := o.delete(); err != nil {
s.Warnf("Deleting consumer encountered an error: %v", err)
}
@@ -3792,7 +3795,8 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
}
mset.setStreamAssignment(sa)
// Check if our config has really been updated.
if !reflect.DeepEqual(mset.config(), sa.Config) {
cfg := mset.config()
if !reflect.DeepEqual(&cfg, sa.Config) {
if err = mset.updateWithAdvisory(sa.Config, false); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
if osa != nil {
@@ -6331,8 +6335,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
}
if isReplicaChange {
isScaleUp := newCfg.Replicas > len(rg.Peers)
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
if isScaleUp {
// Check that we have the allocation available.
if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil {
resp.Error = err
@@ -6408,22 +6413,82 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
// Need to remap any consumers.
for _, ca := range osa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy.
// Legacy ephemerals are R=1 but present as R=0, so only auto-remap named consumers, or if we are downsizing the consumer peers.
// If stream is interest or workqueue policy always remaps since they require peer parity with stream.
numPeers := len(ca.Group.Peers)
if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy {
isAutoScale := ca.Config.Replicas == 0 && (ca.Config.Durable != _EMPTY_ || ca.Config.Name != _EMPTY_)
if isAutoScale || numPeers > len(rg.Peers) || cfg.Retention != LimitsPolicy {
cca := ca.copyGroup()
// Adjust preferred as needed.
if numPeers == 1 && len(rg.Peers) > 1 {
if numPeers == 1 && isScaleUp {
cca.Group.Preferred = ca.Group.Peers[0]
} else {
cca.Group.Preferred = _EMPTY_
}
// Assign new peers.
cca.Group.Peers = rg.Peers
// If the replicas was not 0 make sure it matches here.
if cca.Config.Replicas != 0 {
cca.Config.Replicas = len(rg.Peers)
}
// We can not propose here before the stream itself so we collect them.
consumers = append(consumers, cca)
} else if !isScaleUp {
// We decided to leave this consumer's peer group alone but we are also scaling down.
// We need to make sure we do not have any peers that are no longer part of the stream.
// Note we handle down scaling of a consumer above if its number of peers were > new stream peers.
var needReplace []string
for _, rp := range ca.Group.Peers {
// Check if we have an orphaned peer now for this consumer.
if !rg.isMember(rp) {
needReplace = append(needReplace, rp)
}
}
if len(needReplace) > 0 {
newPeers := copyStrings(rg.Peers)
rand.Shuffle(len(newPeers), func(i, j int) { newPeers[i], newPeers[j] = newPeers[j], newPeers[i] })
// If we had a small size then the peer set, restrict to the same number.
if lp := len(ca.Group.Peers); lp < len(newPeers) {
newPeers = newPeers[:lp]
}
cca := ca.copyGroup()
// Assign new peers.
cca.Group.Peers = newPeers
// If the replicas was not 0 make sure it matches here.
if cca.Config.Replicas != 0 {
cca.Config.Replicas = len(newPeers)
}
// Check if all peers are invalid. This can happen with R1 under replicated streams that are being scaled down.
if len(needReplace) == len(ca.Group.Peers) {
// We have to transfer state to new peers.
// we will grab our state and attach to the new assignment.
// TODO(dlc) - In practice we would want to make sure the consumer is paused.
// Need to release js lock.
js.mu.Unlock()
if ci, err := sysRequest[ConsumerInfo](s, clusterConsumerInfoT, acc, osa.Config.Name, ca.Name); err != nil {
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, osa.Config.Name, ca.Name, err)
} else if ci != nil {
cca.State = &ConsumerState{
Delivered: SequencePair{
Consumer: ci.Delivered.Consumer,
Stream: ci.Delivered.Stream,
},
AckFloor: SequencePair{
Consumer: ci.AckFloor.Consumer,
Stream: ci.AckFloor.Stream,
},
}
}
// Re-acquire here.
js.mu.Lock()
}
// We can not propose here before the stream itself so we collect them.
consumers = append(consumers, cca)
}
}
}
} else if isMoveRequest {
if len(peerSet) == 0 {
nrg, err := js.createGroupForStream(ci, newCfg)

View File

@@ -283,3 +283,14 @@ type JSServerRemovedAdvisory struct {
Cluster string `json:"cluster"`
Domain string `json:"domain,omitempty"`
}
// JSAPILimitReachedAdvisoryType is sent when the JS API request queue limit is reached.
const JSAPILimitReachedAdvisoryType = "io.nats.jetstream.advisory.v1.api_limit_reached"
// JSAPILimitReachedAdvisory is a advisory published when JetStream hits the queue length limit.
type JSAPILimitReachedAdvisory struct {
TypedEvent
Server string `json:"server"` // Server that created the event, name or ID
Domain string `json:"domain,omitempty"` // Domain the server belongs to
Dropped int64 `json:"dropped"` // How many messages did we drop from the queue
}

View File

@@ -1462,6 +1462,9 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
if ci.Leader == s.info.Name {
v.Meta.Replicas = ci.Replicas
}
if ipq := s.jsAPIRoutedReqs; ipq != nil {
v.Meta.Pending = ipq.len()
}
}
}
}
@@ -2791,6 +2794,7 @@ type MetaClusterInfo struct {
Peer string `json:"peer,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
Size int `json:"cluster_size"`
Pending int `json:"pending"`
}
// JSInfo has detailed information on JetStream.
@@ -2990,6 +2994,9 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
if isLeader {
jsi.Meta.Replicas = ci.Replicas
}
if ipq := s.jsAPIRoutedReqs; ipq != nil {
jsi.Meta.Pending = ipq.len()
}
}
}

View File

@@ -251,86 +251,87 @@ type AuthCallout struct {
// NOTE: This structure is no longer used for monitoring endpoints
// and json tags are deprecated and may be removed in the future.
type Options struct {
ConfigFile string `json:"-"`
ServerName string `json:"server_name"`
Host string `json:"addr"`
Port int `json:"port"`
DontListen bool `json:"dont_listen"`
ClientAdvertise string `json:"-"`
Trace bool `json:"-"`
Debug bool `json:"-"`
TraceVerbose bool `json:"-"`
NoLog bool `json:"-"`
NoSigs bool `json:"-"`
NoSublistCache bool `json:"-"`
NoHeaderSupport bool `json:"-"`
DisableShortFirstPing bool `json:"-"`
Logtime bool `json:"-"`
LogtimeUTC bool `json:"-"`
MaxConn int `json:"max_connections"`
MaxSubs int `json:"max_subscriptions,omitempty"`
MaxSubTokens uint8 `json:"-"`
Nkeys []*NkeyUser `json:"-"`
Users []*User `json:"-"`
Accounts []*Account `json:"-"`
NoAuthUser string `json:"-"`
SystemAccount string `json:"-"`
NoSystemAccount bool `json:"-"`
Username string `json:"-"`
Password string `json:"-"`
Authorization string `json:"-"`
AuthCallout *AuthCallout `json:"-"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPBasePath string `json:"http_base_path"`
HTTPSPort int `json:"https_port"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int32 `json:"max_control_line"`
MaxPayload int32 `json:"max_payload"`
MaxPending int64 `json:"max_pending"`
Cluster ClusterOpts `json:"cluster,omitempty"`
Gateway GatewayOpts `json:"gateway,omitempty"`
LeafNode LeafNodeOpts `json:"leaf,omitempty"`
JetStream bool `json:"jetstream"`
JetStreamMaxMemory int64 `json:"-"`
JetStreamMaxStore int64 `json:"-"`
JetStreamDomain string `json:"-"`
JetStreamExtHint string `json:"-"`
JetStreamKey string `json:"-"`
JetStreamOldKey string `json:"-"`
JetStreamCipher StoreCipher `json:"-"`
JetStreamUniqueTag string
JetStreamLimits JSLimitOpts
JetStreamMaxCatchup int64
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
SyncAlways bool `json:"-"`
JsAccDefaultDomain map[string]string `json:"-"` // account to domain name mapping
Websocket WebsocketOpts `json:"-"`
MQTT MQTTOpts `json:"-"`
ProfPort int `json:"-"`
ProfBlockRate int `json:"-"`
PidFile string `json:"-"`
PortsFileDir string `json:"-"`
LogFile string `json:"-"`
LogSizeLimit int64 `json:"-"`
LogMaxFiles int64 `json:"-"`
Syslog bool `json:"-"`
RemoteSyslog string `json:"-"`
Routes []*url.URL `json:"-"`
RoutesStr string `json:"-"`
TLSTimeout float64 `json:"tls_timeout"`
TLS bool `json:"-"`
TLSVerify bool `json:"-"`
TLSMap bool `json:"-"`
TLSCert string `json:"-"`
TLSKey string `json:"-"`
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
TLSPinnedCerts PinnedCertSet `json:"-"`
TLSRateLimit int64 `json:"-"`
ConfigFile string `json:"-"`
ServerName string `json:"server_name"`
Host string `json:"addr"`
Port int `json:"port"`
DontListen bool `json:"dont_listen"`
ClientAdvertise string `json:"-"`
Trace bool `json:"-"`
Debug bool `json:"-"`
TraceVerbose bool `json:"-"`
NoLog bool `json:"-"`
NoSigs bool `json:"-"`
NoSublistCache bool `json:"-"`
NoHeaderSupport bool `json:"-"`
DisableShortFirstPing bool `json:"-"`
Logtime bool `json:"-"`
LogtimeUTC bool `json:"-"`
MaxConn int `json:"max_connections"`
MaxSubs int `json:"max_subscriptions,omitempty"`
MaxSubTokens uint8 `json:"-"`
Nkeys []*NkeyUser `json:"-"`
Users []*User `json:"-"`
Accounts []*Account `json:"-"`
NoAuthUser string `json:"-"`
SystemAccount string `json:"-"`
NoSystemAccount bool `json:"-"`
Username string `json:"-"`
Password string `json:"-"`
Authorization string `json:"-"`
AuthCallout *AuthCallout `json:"-"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPBasePath string `json:"http_base_path"`
HTTPSPort int `json:"https_port"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int32 `json:"max_control_line"`
MaxPayload int32 `json:"max_payload"`
MaxPending int64 `json:"max_pending"`
Cluster ClusterOpts `json:"cluster,omitempty"`
Gateway GatewayOpts `json:"gateway,omitempty"`
LeafNode LeafNodeOpts `json:"leaf,omitempty"`
JetStream bool `json:"jetstream"`
JetStreamMaxMemory int64 `json:"-"`
JetStreamMaxStore int64 `json:"-"`
JetStreamDomain string `json:"-"`
JetStreamExtHint string `json:"-"`
JetStreamKey string `json:"-"`
JetStreamOldKey string `json:"-"`
JetStreamCipher StoreCipher `json:"-"`
JetStreamUniqueTag string
JetStreamLimits JSLimitOpts
JetStreamMaxCatchup int64
JetStreamRequestQueueLimit int64
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
SyncAlways bool `json:"-"`
JsAccDefaultDomain map[string]string `json:"-"` // account to domain name mapping
Websocket WebsocketOpts `json:"-"`
MQTT MQTTOpts `json:"-"`
ProfPort int `json:"-"`
ProfBlockRate int `json:"-"`
PidFile string `json:"-"`
PortsFileDir string `json:"-"`
LogFile string `json:"-"`
LogSizeLimit int64 `json:"-"`
LogMaxFiles int64 `json:"-"`
Syslog bool `json:"-"`
RemoteSyslog string `json:"-"`
Routes []*url.URL `json:"-"`
RoutesStr string `json:"-"`
TLSTimeout float64 `json:"tls_timeout"`
TLS bool `json:"-"`
TLSVerify bool `json:"-"`
TLSMap bool `json:"-"`
TLSCert string `json:"-"`
TLSKey string `json:"-"`
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
TLSPinnedCerts PinnedCertSet `json:"-"`
TLSRateLimit int64 `json:"-"`
// When set to true, the server will perform the TLS handshake before
// sending the INFO protocol. For clients that are not configured
// with a similar option, their connection will fail with some sort
@@ -675,6 +676,7 @@ type TLSConfigOpts struct {
CertMatch string
OCSPPeerConfig *certidp.OCSPPeerConfig
Certificates []*TLSCertPairOpt
MinVersion uint16
}
// TLSCertPairOpt are the paths to a certificate and private key.
@@ -1195,6 +1197,22 @@ func (o *Options) processConfigFileLine(k string, v any, errors *[]error, warnin
opFiles = append(opFiles, v)
case []string:
opFiles = append(opFiles, v...)
case []any:
for _, t := range v {
if token, ok := t.(token); ok {
if v, ok := token.Value().(string); ok {
opFiles = append(opFiles, v)
} else {
err := &configErr{tk, fmt.Sprintf("error parsing operators: unsupported type %T where string is expected", token)}
*errors = append(*errors, err)
break
}
} else {
err := &configErr{tk, fmt.Sprintf("error parsing operators: unsupported type %T", t)}
*errors = append(*errors, err)
break
}
}
default:
err := &configErr{tk, fmt.Sprintf("error parsing operators: unsupported type %T", v)}
*errors = append(*errors, err)
@@ -2218,6 +2236,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
}
opts.JetStreamMaxCatchup = s
case "request_queue_limit":
lim, ok := mv.(int64)
if !ok {
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
opts.JetStreamRequestQueueLimit = lim
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
@@ -4214,6 +4238,24 @@ func parseCurvePreferences(curveName string) (tls.CurveID, error) {
return curve, nil
}
func parseTLSVersion(v any) (uint16, error) {
var tlsVersionNumber uint16
switch v := v.(type) {
case string:
n, err := tlsVersionFromString(v)
if err != nil {
return 0, err
}
tlsVersionNumber = n
default:
return 0, fmt.Errorf("'min_version' wrong type: %v", v)
}
if tlsVersionNumber < tls.VersionTLS12 {
return 0, fmt.Errorf("unsupported TLS version: %s", tls.VersionName(tlsVersionNumber))
}
return tlsVersionNumber, nil
}
// Helper function to parse TLS configs.
func parseTLS(v any, isClientCtx bool) (t *TLSConfigOpts, retErr error) {
var (
@@ -4457,6 +4499,12 @@ func parseTLS(v any, isClientCtx bool) (t *TLSConfigOpts, retErr error) {
}
tc.Certificates[i] = certPair
}
case "min_version":
minVersion, err := parseTLSVersion(mv)
if err != nil {
return nil, &configErr{tk, fmt.Sprintf("error parsing tls config: %v", err)}
}
tc.MinVersion = minVersion
default:
return nil, &configErr{tk, fmt.Sprintf("error parsing tls config, unknown field %q", mk)}
}
@@ -4808,6 +4856,13 @@ func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error) {
}
config.ClientCAs = pool
}
// Allow setting TLS minimum version.
if tc.MinVersion > 0 {
if tc.MinVersion < tls.VersionTLS12 {
return nil, fmt.Errorf("unsupported minimum TLS version: %s", tls.VersionName(tc.MinVersion))
}
config.MinVersion = tc.MinVersion
}
return &config, nil
}
@@ -5177,6 +5232,9 @@ func setBaselineOptions(opts *Options) {
if opts.SyncInterval == 0 && !opts.syncSet {
opts.SyncInterval = defaultSyncInterval
}
if opts.JetStreamRequestQueueLimit <= 0 {
opts.JetStreamRequestQueueLimit = JSDefaultRequestQueueLimit
}
}
func getDefaultAuthTimeout(tls *tls.Config, tlsTimeout float64) float64 {

View File

@@ -1150,7 +1150,7 @@ func imposeOrder(value any) error {
slices.SortFunc(value.Gateways, func(i, j *RemoteGatewayOpts) int { return cmp.Compare(i.Name, j.Name) })
case WebsocketOpts:
slices.Sort(value.AllowedOrigins)
case string, bool, uint8, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet,
case string, bool, uint8, uint16, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet,
*URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList,
*OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig:
// explicitly skipped types

View File

@@ -1705,7 +1705,7 @@ func (s *Server) setSystemAccount(acc *Account) error {
recvqp: newIPQueue[*inSysMsg](s, "System recvQ Pings"),
resetCh: make(chan struct{}),
sq: s.newSendQ(),
statsz: eventsHBInterval,
statsz: statsHBInterval,
orphMax: 5 * eventsHBInterval,
chkOrph: 3 * eventsHBInterval,
}
@@ -1990,9 +1990,9 @@ func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error
accClaims, _, err := s.verifyAccountClaims(claimJWT)
if err == nil && accClaims != nil {
acc.mu.Lock()
if acc.Issuer == _EMPTY_ {
acc.Issuer = accClaims.Issuer
}
// if an account is updated with a different operator signing key, we want to
// show a consistent issuer.
acc.Issuer = accClaims.Issuer
if acc.Name != accClaims.Subject {
acc.mu.Unlock()
return ErrAccountValidation
@@ -2926,8 +2926,10 @@ func (s *Server) startMonitoring(secure bool) error {
}
hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
config := opts.TLSConfig.Clone()
config.GetConfigForClient = s.getMonitoringTLSConfig
config.ClientAuth = tls.NoClientCert
if !s.ocspPeerVerify {
config.GetConfigForClient = s.getMonitoringTLSConfig
config.ClientAuth = tls.NoClientCert
}
httpListener, err = tls.Listen("tcp", hp, config)
} else {
@@ -3441,6 +3443,20 @@ func tlsVersion(ver uint16) string {
return fmt.Sprintf("Unknown [0x%x]", ver)
}
func tlsVersionFromString(ver string) (uint16, error) {
switch ver {
case "1.0":
return tls.VersionTLS10, nil
case "1.1":
return tls.VersionTLS11, nil
case "1.2":
return tls.VersionTLS12, nil
case "1.3":
return tls.VersionTLS13, nil
}
return 0, fmt.Errorf("unknown version: %v", ver)
}
// We use hex here so we don't need multiple versions
func tlsCipher(cs uint16) string {
name, present := cipherMapByID[cs]

View File

@@ -650,9 +650,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
mset.store.FastState(&state)
// Possible race with consumer.setLeader during recovery.
mset.mu.RLock()
mset.mu.Lock()
mset.lseq = state.LastSeq
mset.mu.RUnlock()
mset.mu.Unlock()
// If no msgs (new stream), set dedupe state loaded to true.
if state.Msgs == 0 {

2
vendor/modules.txt vendored
View File

@@ -1431,7 +1431,7 @@ github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.5.8
## explicit; go 1.18
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.10.20
# github.com/nats-io/nats-server/v2 v2.10.21
## explicit; go 1.21.0
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand