diff --git a/go.mod b/go.mod index c182c3da43..879eccda79 100644 --- a/go.mod +++ b/go.mod @@ -63,7 +63,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.2.1 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.10.15 + github.com/nats-io/nats-server/v2 v2.10.16 github.com/nats-io/nats.go v1.35.0 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 @@ -255,7 +255,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/juliangruber/go-intersect v1.1.0 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/compress v1.17.7 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/libregraph/oidc-go v1.1.0 // indirect @@ -282,7 +282,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/jwt/v2 v2.5.6 // indirect + github.com/nats-io/jwt/v2 v2.5.7 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/nxadm/tail v1.4.8 // indirect diff --git a/go.sum b/go.sum index 9cab1edfcc..3b288de134 100644 --- a/go.sum +++ b/go.sum @@ -1605,8 +1605,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= -github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= @@ -1755,10 +1755,10 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= -github.com/nats-io/jwt/v2 v2.5.6 h1:Cp618+z4q042sWqHiSoIHFT08OZtAskui0hTmRfmGGQ= -github.com/nats-io/jwt/v2 v2.5.6/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.15 h1:O/l+ZT91ltMiiRJKjWLQJcGg7ypzjlb/bC5bFIRVw3M= -github.com/nats-io/nats-server/v2 v2.10.15/go.mod h1:ul+pGt5I7e4U+nI09ZFDG4vqM+6Ce2Tou7UbVSnLiIw= +github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c= +github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0= +github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU= github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk= github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= diff --git a/vendor/github.com/klauspost/compress/s2/writer.go b/vendor/github.com/klauspost/compress/s2/writer.go index 1253ea675c..637c931474 100644 --- a/vendor/github.com/klauspost/compress/s2/writer.go +++ b/vendor/github.com/klauspost/compress/s2/writer.go @@ -937,7 +937,7 @@ func WriterUncompressed() WriterOption { // WriterBlockSize allows to override the default block size. // Blocks will be this size or smaller. -// Minimum size is 4KB and and maximum size is 4MB. +// Minimum size is 4KB and maximum size is 4MB. // // Bigger blocks may give bigger throughput on systems with many cores, // and will increase compression slightly, but it will limit the possible diff --git a/vendor/github.com/nats-io/jwt/v2/user_claims.go b/vendor/github.com/nats-io/jwt/v2/user_claims.go index 0b38af69e9..53b781dbd4 100644 --- a/vendor/github.com/nats-io/jwt/v2/user_claims.go +++ b/vendor/github.com/nats-io/jwt/v2/user_claims.go @@ -29,6 +29,7 @@ const ( ConnectionTypeLeafnodeWS = "LEAFNODE_WS" ConnectionTypeMqtt = "MQTT" ConnectionTypeMqttWS = "MQTT_WS" + ConnectionTypeInProcess = "IN_PROCESS" ) type UserPermissionLimits struct { diff --git a/vendor/github.com/nats-io/nats-server/v2/conf/lex.go b/vendor/github.com/nats-io/nats-server/v2/conf/lex.go index 278fab0406..80a9a1147e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/conf/lex.go +++ b/vendor/github.com/nats-io/nats-server/v2/conf/lex.go @@ -332,12 +332,12 @@ func lexBlockStart(lx *lexer) stateFn { lx.ignore() return lx.pop() case commentHashStart: - lx.push(lexBlockEnd) + lx.push(lexBlockStart) return lexCommentStart case commentSlashStart: rn := lx.next() if rn == commentSlashStart { - lx.push(lexBlockEnd) + lx.push(lexBlockStart) return lexCommentStart } lx.backup() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index 5e20f301e2..0e8540f091 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.10.15" + VERSION = "2.10.16" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index d75f6bb66a..34cac17007 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -2748,7 +2748,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b return } - var sagap uint64 + var sgap, floor uint64 var needSignal bool switch o.cfg.AckPolicy { @@ -2792,12 +2792,29 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true } - sagap = sseq - o.asflr + sgap = sseq - o.asflr + floor = sgap // start at same and set lower as we go. o.adflr, o.asflr = dseq, sseq - for seq := sseq; seq > sseq-sagap; seq-- { + + remove := func(seq uint64) { delete(o.pending, seq) delete(o.rdc, seq) o.removeFromRedeliverQueue(seq) + if seq < floor { + floor = seq + } + } + // Determine if smarter to walk all of pending vs the sequence range. + if sgap > uint64(len(o.pending)) { + for seq := range o.pending { + if seq <= sseq { + remove(seq) + } + } + } else { + for seq := sseq; seq > sseq-sgap && len(o.pending) > 0; seq-- { + remove(seq) + } } case AckNone: // FIXME(dlc) - This is error but do we care? @@ -2808,20 +2825,19 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b // Update underlying store. o.updateAcks(dseq, sseq, reply) - clustered := o.node != nil - // In case retention changes for a stream, this ought to have been updated // using the consumer lock to avoid a race. retention := o.retention + clustered := o.node != nil o.mu.Unlock() // Let the owning stream know if we are interest or workqueue retention based. // If this consumer is clustered this will be handled by processReplicatedAck // after the ack has propagated. if !clustered && mset != nil && retention != LimitsPolicy { - if sagap > 1 { - // FIXME(dlc) - This is very inefficient, will need to fix. - for seq := sseq; seq > sseq-sagap; seq-- { + if sgap > 1 { + // FIXME(dlc) - This can very inefficient, will need to fix. + for seq := sseq; seq >= floor; seq-- { mset.ackMsg(o, seq) } } else { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/events.go b/vendor/github.com/nats-io/nats-server/v2/server/events.go index 3e257eab77..d165b9ef20 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/events.go @@ -632,6 +632,11 @@ func (s *Server) sendInternalAccountMsgWithReply(a *Account, subject, reply stri s.mu.RLock() if s.sys == nil || s.sys.sendq == nil { s.mu.RUnlock() + if s.isShuttingDown() { + // Skip in case this was called at the end phase during shut down + // to avoid too many entries in the logs. + return nil + } return ErrNoSysAccount } c := s.sys.client diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index a71237c61e..188de4aca5 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -3430,7 +3430,6 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { lseq := seq + num - 1 mb.mu.Lock() - var needsRecord bool // If we are empty update meta directly. if mb.msgs == 0 { atomic.StoreUint64(&mb.last.seq, lseq) @@ -3438,7 +3437,6 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { atomic.StoreUint64(&mb.first.seq, lseq+1) mb.first.ts = nowts } else { - needsRecord = true for ; seq <= lseq; seq++ { mb.dmap.Insert(seq) } @@ -3446,9 +3444,7 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { mb.mu.Unlock() // Write out our placeholder. - if needsRecord { - mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true) - } + mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true) // Now update FS accounting. // Update fs state. @@ -8169,6 +8165,7 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks { } // SyncDeleted will make sure this stream has same deleted state as dbs. +// This will only process deleted state within our current state. func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { if len(dbs) == 0 { return @@ -8177,18 +8174,22 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { fs.mu.Lock() defer fs.mu.Unlock() + lseq := fs.state.LastSeq var needsCheck DeleteBlocks fs.readLockAllMsgBlocks() mdbs := fs.deleteBlocks() for i, db := range dbs { + first, last, num := db.State() // If the block is same as what we have we can skip. if i < len(mdbs) { - first, last, num := db.State() eFirst, eLast, eNum := mdbs[i].State() if first == eFirst && last == eLast && num == eNum { continue } + } else if first > lseq { + // Skip blocks not applicable to our current state. + continue } // Need to insert these. needsCheck = append(needsCheck, db) @@ -8616,9 +8617,16 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error { sgap := sseq - o.state.AckFloor.Stream o.state.AckFloor.Consumer = dseq o.state.AckFloor.Stream = sseq - for seq := sseq; seq > sseq-sgap; seq-- { - delete(o.state.Pending, seq) - if len(o.state.Redelivered) > 0 { + if sgap > uint64(len(o.state.Pending)) { + for seq := range o.state.Pending { + if seq <= sseq { + delete(o.state.Pending, seq) + delete(o.state.Redelivered, seq) + } + } + } else { + for seq := sseq; seq > sseq-sgap && len(o.state.Pending) > 0; seq-- { + delete(o.state.Pending, seq) delete(o.state.Redelivered, seq) } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 880f9346f2..505fab1f9d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -2405,9 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // If we are interest based make sure to check consumers if interest retention policy. // This is to make sure we process any outstanding acks from all consumers. mset.checkInterestState() - // Make sure we create a new snapshot in case things have changed such that any existing - // snapshot may no longer be valid. - doSnapshot() // If we became leader during this time and we need to send a snapshot to our // followers, i.e. as a result of a scale-up from R1, do it now. if sendSnapshot && isLeader && mset != nil && n != nil { @@ -2941,6 +2938,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if err != nil { if err == errLastSeqMismatch { + var state StreamState mset.store.FastState(&state) @@ -2952,6 +2950,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Retry err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts) } + // FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected + // and what we got. } // Only return in place if we are going to reset our stream or we are out of space, or we are closed. @@ -3568,9 +3568,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss js.mu.Unlock() } - var needsSetLeader bool if !alreadyRunning && numReplicas > 1 { if needsNode { + // Since we are scaling up we want to make sure our sync subject + // is registered before we start our raft node. + mset.mu.Lock() + mset.startClusterSubs() + mset.mu.Unlock() + js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{ "type": "stream", "account": mset.accName(), @@ -3602,16 +3607,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss rg.node = nil js.mu.Unlock() } + // Set the new stream assignment. + mset.setStreamAssignment(sa) + // Call update. if err = mset.updateWithAdvisory(cfg, !recovering); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err) } - // Set the new stream assignment. - mset.setStreamAssignment(sa) - // Make sure we are the leader now that we are R1. - if needsSetLeader { - mset.setLeader(true) - } } // If not found we must be expanding into this node since if we are here we know we are a member. @@ -7582,7 +7584,8 @@ func (mset *stream) supportsBinarySnapshotLocked() bool { // We know we support ourselves. continue } - if sir, ok := s.nodeToInfo.Load(p.ID); !ok || sir == nil || !sir.(nodeInfo).binarySnapshots { + // Since release 2.10.16 only deny if we know the other node does not support. + if sir, ok := s.nodeToInfo.Load(p.ID); ok && sir != nil && !sir.(nodeInfo).binarySnapshots { return false } } @@ -8681,7 +8684,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { return 0 } - nextBatchC := make(chan struct{}, 1) + nextBatchC := make(chan struct{}, 4) nextBatchC <- struct{}{} remoteQuitCh := make(chan struct{}) @@ -8706,19 +8709,18 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Kick ourselves and anyone else who might have stalled on global state. select { case nextBatchC <- struct{}{}: - // Reset our activity - notActive.Reset(activityInterval) default: } + // Reset our activity + notActive.Reset(activityInterval) }) defer s.sysUnsubscribe(ackSub) ackReplyT := strings.ReplaceAll(ackReply, ".*", ".%d") // Grab our state. var state StreamState - mset.mu.RLock() + // mset.store never changes after being set, don't need lock. mset.store.FastState(&state) - mset.mu.RUnlock() // Reset notion of first if this request wants sequences before our starting sequence // and we would have nothing to send. If we have partial messages still need to send skips for those. @@ -8756,7 +8758,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Wait til we can send at least 4k const minBatchWait = int32(4 * 1024) mw := time.NewTimer(minWait) - for done := false; !done; { + for done := maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait; !done; { select { case <-nextBatchC: done = maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait @@ -8811,9 +8813,33 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { dr.First, dr.Num = 0, 0 } + // See if we should use LoadNextMsg instead of walking sequence by sequence if we have an order magnitude more interior deletes. + // Only makes sense with delete range capabilities. + useLoadNext := drOk && (uint64(state.NumDeleted) > 10*state.Msgs) + var smv StoreMsg for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ { - sm, err := mset.store.LoadMsg(seq, &smv) + var sm *StoreMsg + var err error + // Is we should use load next do so here. + if useLoadNext { + var nseq uint64 + sm, nseq, err = mset.store.LoadNextMsg(fwcs, true, seq, &smv) + if err == nil && nseq > seq { + dr.First, dr.Num = seq, nseq-seq + // Jump ahead + seq = nseq + } else if err == ErrStoreEOF { + dr.First, dr.Num = seq, state.LastSeq-seq + // Clear EOF here for normal processing. + err = nil + // Jump ahead + seq = state.LastSeq + } + } else { + sm, err = mset.store.LoadMsg(seq, &smv) + } + // if this is not a deleted msg, bail out. if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg { if err == ErrStoreEOF { @@ -8829,6 +8855,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { if n := mset.raftNode(); n != nil { n.InstallSnapshot(mset.stateSnapshot()) } + // If we allow gap markers check if we have one pending. + if drOk && dr.First > 0 { + sendDR() + } // Signal EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) return false @@ -8875,6 +8905,9 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { } // Recheck our exit condition. if seq == last { + if drOk && dr.First > 0 { + sendDR() + } s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) // EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) @@ -8890,7 +8923,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { if drOk && dr.First > 0 { sendDR() } - return true } @@ -8930,6 +8962,11 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { mset.clearCatchupPeer(sreq.Peer) return } + case <-time.After(500 * time.Millisecond): + if !sendNextBatchAndContinue(qch) { + mset.clearCatchupPeer(sreq.Peer) + return + } } } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index 3da696bcc6..19560b04da 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -1402,19 +1402,25 @@ func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) { // SyncDeleted will make sure this stream has same deleted state as dbs. func (ms *memStore) SyncDeleted(dbs DeleteBlocks) { + ms.mu.Lock() + defer ms.mu.Unlock() + // For now we share one dmap, so if we have one entry here check if states are the same. // Note this will work for any DeleteBlock type, but we expect this to be a dmap too. if len(dbs) == 1 { - ms.mu.RLock() min, max, num := ms.dmap.State() - ms.mu.RUnlock() if pmin, pmax, pnum := dbs[0].State(); pmin == min && pmax == max && pnum == num { return } } + lseq := ms.state.LastSeq for _, db := range dbs { - db.Range(func(dseq uint64) bool { - ms.RemoveMsg(dseq) + // Skip if beyond our current state. + if first, _, _ := db.State(); first > lseq { + continue + } + db.Range(func(seq uint64) bool { + ms.removeMsg(seq, false) return true }) } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/vendor/github.com/nats-io/nats-server/v2/server/raft.go index 975ebd068b..e762417754 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -887,8 +887,10 @@ func (n *raft) ResumeApply() { } n.debug("Resuming our apply channel") - n.observer, n.pobserver = n.pobserver, false - n.paused = false + + // Reset before we start. + n.resetElectionTimeout() + // Run catchup.. if n.hcommit > n.commit { n.debug("Resuming %d replays", n.hcommit+1-n.commit) @@ -904,12 +906,16 @@ func (n *raft) ResumeApply() { runtime.Gosched() // Simply re-acquire n.Lock() - // Need to check if we got closed or if we were paused again. - if n.State() == Closed || n.paused { + // Need to check if we got closed. + if n.State() == Closed { return } } } + + // Clear our observer and paused state after we apply. + n.observer, n.pobserver = n.pobserver, false + n.paused = false n.hcommit = 0 // If we had been selected to be the next leader campaign here now that we have resumed. @@ -3352,7 +3358,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if l > paeWarnThreshold && l%paeWarnModulo == 0 { n.warn("%d append entries pending", len(n.pae)) } - } else { + } else if l%paeWarnModulo == 0 { n.debug("Not saving to append entries pending") } } else { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/server.go b/vendor/github.com/nats-io/nats-server/v2/server/server.go index 89e9c7de23..0b0ec2acf4 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/server.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/server.go @@ -932,7 +932,13 @@ func (s *Server) setClusterName(name string) { // Return whether the cluster name is dynamic. func (s *Server) isClusterNameDynamic() bool { - return s.getOpts().Cluster.Name == _EMPTY_ + // We need to lock the whole "Cluster.Name" check and not use s.getOpts() + // because otherwise this could cause a data race with setting the name in + // route.go's processRouteConnect(). + s.optsMu.RLock() + dynamic := s.opts.Cluster.Name == _EMPTY_ + s.optsMu.RUnlock() + return dynamic } // Returns our configured serverName. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index 518fb6599d..6ed1792e35 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -840,7 +840,9 @@ func (mset *stream) setLeader(isLeader bool) error { if isLeader { // Make sure we are listening for sync requests. // TODO(dlc) - Original design was that all in sync members of the group would do DQ. - mset.startClusterSubs() + if mset.isClustered() { + mset.startClusterSubs() + } // Setup subscriptions if we were not already the leader. if err := mset.subscribeToStream(); err != nil { @@ -875,7 +877,7 @@ func (mset *stream) setLeader(isLeader bool) error { // Lock should be held. func (mset *stream) startClusterSubs() { - if mset.isClustered() && mset.syncSub == nil { + if mset.syncSub == nil { mset.syncSub, _ = mset.srv.systemSubscribe(mset.sa.Sync, _EMPTY_, false, mset.sysc, mset.handleClusterSyncRequest) } } @@ -4868,6 +4870,10 @@ func (mset *stream) name() string { func (mset *stream) internalLoop() { mset.mu.RLock() + setGoRoutineLabels(pprofLabels{ + "account": mset.acc.Name, + "stream": mset.cfg.Name, + }) s := mset.srv c := s.createInternalJetStreamClient() c.registerWithAccount(mset.acc) diff --git a/vendor/modules.txt b/vendor/modules.txt index e7b56299e7..4082d1f682 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1250,7 +1250,7 @@ github.com/justinas/alice # github.com/kevinburke/ssh_config v1.2.0 ## explicit github.com/kevinburke/ssh_config -# github.com/klauspost/compress v1.17.7 +# github.com/klauspost/compress v1.17.8 ## explicit; go 1.20 github.com/klauspost/compress/flate github.com/klauspost/compress/internal/race @@ -1406,10 +1406,10 @@ github.com/mohae/deepcopy # github.com/mschoch/smat v0.2.0 ## explicit; go 1.13 github.com/mschoch/smat -# github.com/nats-io/jwt/v2 v2.5.6 +# github.com/nats-io/jwt/v2 v2.5.7 ## explicit; go 1.18 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.10.15 +# github.com/nats-io/nats-server/v2 v2.10.16 ## explicit; go 1.20 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/fastrand