From ee26fb3c6f2e8fef21341c62899986dc90315c69 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 5 Feb 2025 14:51:18 +0000 Subject: [PATCH] Bump github.com/nats-io/nats-server/v2 from 2.10.24 to 2.10.25 Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.10.24 to 2.10.25. - [Release notes](https://github.com/nats-io/nats-server/releases) - [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml) - [Commits](https://github.com/nats-io/nats-server/compare/v2.10.24...v2.10.25) --- updated-dependencies: - dependency-name: github.com/nats-io/nats-server/v2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 4 +- go.sum | 8 +- .../nats-io/nats-server/v2/server/const.go | 2 +- .../nats-io/nats-server/v2/server/consumer.go | 173 ++++++----- .../nats-io/nats-server/v2/server/events.go | 22 ++ .../nats-server/v2/server/filestore.go | 241 ++++++++++------ .../nats-io/nats-server/v2/server/ipqueue.go | 7 +- .../nats-server/v2/server/jetstream.go | 11 + .../nats-server/v2/server/jetstream_api.go | 34 ++- .../v2/server/jetstream_cluster.go | 269 ++++++------------ .../nats-server/v2/server/jetstream_events.go | 14 +- .../nats-io/nats-server/v2/server/leafnode.go | 32 ++- .../nats-io/nats-server/v2/server/memstore.go | 94 +++--- .../nats-io/nats-server/v2/server/raft.go | 38 ++- .../nats-io/nats-server/v2/server/route.go | 20 +- .../nats-io/nats-server/v2/server/store.go | 7 + .../nats-io/nats-server/v2/server/stream.go | 44 ++- vendor/modules.txt | 4 +- 18 files changed, 570 insertions(+), 454 deletions(-) diff --git a/go.mod b/go.mod index f46535f94d..0e6b69b188 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.10.24 + github.com/nats-io/nats-server/v2 v2.10.25 github.com/nats-io/nats.go v1.38.0 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 @@ -323,7 +323,7 @@ require ( go.uber.org/zap v1.23.0 // indirect golang.org/x/mod v0.22.0 // indirect golang.org/x/sys v0.29.0 // indirect - golang.org/x/time v0.8.0 // indirect + golang.org/x/time v0.9.0 // indirect golang.org/x/tools v0.28.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect diff --git a/go.sum b/go.sum index 6bfb5fcee5..84d5be6c0e 100644 --- a/go.sum +++ b/go.sum @@ -823,8 +823,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= -github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4= -github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s= +github.com/nats-io/nats-server/v2 v2.10.25 h1:J0GWLDDXo5HId7ti/lTmBfs+lzhmu8RPkoKl0eSCqwc= +github.com/nats-io/nats-server/v2 v2.10.25/go.mod h1:/YYYQO7cuoOBt+A7/8cVjuhWTaTUEAlZbJT+3sMAfFU= github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= @@ -1464,8 +1464,8 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= -golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index 95f19ca2c7..01c951326f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -55,7 +55,7 @@ func init() { const ( // VERSION is the current version for the server. - VERSION = "2.10.24" + VERSION = "2.10.25" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index 438041ec89..4d66b23b97 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -1373,6 +1373,8 @@ func (o *consumer) setLeader(isLeader bool) { // If we were the leader make sure to drain queued up acks. if wasLeader { o.ackMsgs.drain() + // Reset amount of acks that need to be processed. + atomic.StoreInt64(&o.awl, 0) // Also remove any pending replies since we should not be the one to respond at this point. o.replies = nil } @@ -1415,8 +1417,23 @@ func (o *consumer) unsubscribe(sub *subscription) { // We need to make sure we protect access to the outq. // Do all advisory sends here. -func (o *consumer) sendAdvisory(subj string, msg []byte) { - o.outq.sendMsg(subj, msg) +func (o *consumer) sendAdvisory(subject string, e any) { + if o.acc == nil { + return + } + + // If there is no one listening for this advisory then save ourselves the effort + // and don't bother encoding the JSON or sending it. + if sl := o.acc.sl; (sl != nil && !sl.HasInterest(subject)) && !o.srv.hasGatewayInterest(o.acc.Name, subject) { + return + } + + j, err := json.Marshal(e) + if err != nil { + return + } + + o.outq.sendMsg(subject, j) } func (o *consumer) sendDeleteAdvisoryLocked() { @@ -1432,13 +1449,8 @@ func (o *consumer) sendDeleteAdvisoryLocked() { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name - o.sendAdvisory(subj, j) + o.sendAdvisory(subj, e) } func (o *consumer) sendCreateAdvisory() { @@ -1457,13 +1469,8 @@ func (o *consumer) sendCreateAdvisory() { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name - o.sendAdvisory(subj, j) + o.sendAdvisory(subj, e) } // Created returns created time. @@ -1573,6 +1580,8 @@ var ( consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval ) +// deleteNotActive must only be called from time.AfterFunc or in its own +// goroutine, as it can block on clean-up. func (o *consumer) deleteNotActive() { o.mu.Lock() if o.mset == nil { @@ -1613,8 +1622,25 @@ func (o *consumer) deleteNotActive() { s, js := o.mset.srv, o.srv.js.Load() acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct + var qch, cqch chan struct{} + if o.srv != nil { + qch = o.srv.quitCh + } + if o.js != nil { + cqch = o.js.clusterQuitC() + } o.mu.Unlock() + // Useful for pprof. + setGoRoutineLabels(pprofLabels{ + "account": acc, + "stream": stream, + "consumer": name, + }) + + // We will delete locally regardless. + defer o.delete() + // If we are clustered, check if we still have this consumer assigned. // If we do forward a proposal to delete ourselves to the metacontroller leader. if !isDirect && s.JetStreamIsClustered() { @@ -1637,38 +1663,40 @@ func (o *consumer) deleteNotActive() { if ca != nil && cc != nil { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. - go func() { - jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval))) - interval := consumerNotActiveStartInterval + jitter - ticker := time.NewTicker(interval) - defer ticker.Stop() - for range ticker.C { - js.mu.RLock() - if js.shuttingDown { - js.mu.RUnlock() - return - } - nca := js.consumerAssignment(acc, stream, name) - js.mu.RUnlock() - // Make sure this is not a new consumer with the same name. - if nca != nil && nca == ca { - s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) - meta.ForwardProposal(removeEntry) - if interval < consumerNotActiveMaxInterval { - interval *= 2 - ticker.Reset(interval) - } - continue - } - // We saw that consumer has been removed, all done. + jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval))) + interval := consumerNotActiveStartInterval + jitter + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + case <-qch: + return + case <-cqch: return } - }() + js.mu.RLock() + if js.shuttingDown { + js.mu.RUnlock() + return + } + nca := js.consumerAssignment(acc, stream, name) + js.mu.RUnlock() + // Make sure this is not a new consumer with the same name. + if nca != nil && nca == ca { + s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) + meta.ForwardProposal(removeEntry) + if interval < consumerNotActiveMaxInterval { + interval *= 2 + ticker.Reset(interval) + } + continue + } + // We saw that consumer has been removed, all done. + return + } } } - - // We will delete here regardless. - o.delete() } func (o *consumer) watchGWinterest() { @@ -2382,12 +2410,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - - o.sendAdvisory(o.nakEventT, j) + o.sendAdvisory(o.nakEventT, e) // Check to see if we have delays attached. if len(nak) > len(AckNak) { @@ -2462,15 +2485,8 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - // We had an error during the marshal, so we can't send the advisory, - // but we still need to tell the caller that the ack was processed. - return ackedInPlace - } - subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name - o.sendAdvisory(subj, j) + o.sendAdvisory(subj, e) return ackedInPlace } @@ -2765,12 +2781,7 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - - o.sendAdvisory(o.ackEventT, j) + o.sendAdvisory(o.ackEventT, e) } // Process an ACK. @@ -2851,7 +2862,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() - return ackInPlace + // Return true to let caller respond back to the client. + return true } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true @@ -3515,12 +3527,7 @@ func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - - o.sendAdvisory(o.deliveryExcEventT, j) + o.sendAdvisory(o.deliveryExcEventT, e) } // Check if the candidate subject matches a filter if its present. @@ -3596,17 +3603,23 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } continue } - if seq > 0 { - pmsg := getJSPubMsgFromPool() - sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) - if sm == nil || err != nil { - pmsg.returnToPool() - pmsg, dc = nil, 0 - // Adjust back deliver count. - o.decDeliveryCount(seq) - } - return pmsg, dc, err + pmsg := getJSPubMsgFromPool() + sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) + if sm == nil || err != nil { + pmsg.returnToPool() + pmsg, dc = nil, 0 + // Adjust back deliver count. + o.decDeliveryCount(seq) } + // Message was scheduled for redelivery but was removed in the meantime. + if err == ErrStoreMsgNotFound || err == errDeletedMsg { + // This is a race condition where the message is still in o.pending and + // scheduled for redelivery, but it has been removed from the stream. + // o.processTerm is called in a goroutine so could run after we get here. + // That will correct the pending state and delivery/ack floors, so just skip here. + continue + } + return pmsg, dc, err } } @@ -5379,6 +5392,7 @@ func (o *consumer) requestNextMsgSubject() string { func (o *consumer) decStreamPending(sseq uint64, subj string) { o.mu.Lock() + // Update our cached num pending only if we think deliverMsg has not done so. if sseq >= o.sseq && o.isFilteredMatch(subj) { o.npc-- @@ -5390,6 +5404,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { if o.rdc != nil { rdc = o.rdc[sseq] } + o.mu.Unlock() // If it was pending process it like an ack. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/events.go b/vendor/github.com/nats-io/nats-server/v2/server/events.go index 7c891b423d..7cb9feb6a7 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/events.go @@ -324,6 +324,28 @@ func (ci *ClientInfo) forAssignmentSnap() *ClientInfo { } } +// forProposal returns the minimum amount of ClientInfo we need for assignment proposals. +func (ci *ClientInfo) forProposal() *ClientInfo { + if ci == nil { + return nil + } + cci := *ci + cci.Jwt = _EMPTY_ + cci.IssuerKey = _EMPTY_ + return &cci +} + +// forAdvisory returns the minimum amount of ClientInfo we need for JS advisory events. +func (ci *ClientInfo) forAdvisory() *ClientInfo { + if ci == nil { + return nil + } + cci := *ci + cci.Jwt = _EMPTY_ + cci.Alternates = nil + return &cci +} + // ServerStats hold various statistics that we will periodically send out. type ServerStats struct { Start time.Time `json:"start"` diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index c5920587da..4e0c28ca4a 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -495,7 +495,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Do age checks too, make sure to call in place. if fs.cfg.MaxAge != 0 { - fs.expireMsgsOnRecover() + err := fs.expireMsgsOnRecover() + if isPermissionError(err) { + return nil, err + } fs.startAgeChk() } @@ -1376,14 +1379,14 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { } hdr := buf[index : index+msgHdrSize] - rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:]) + rl, slen := le.Uint32(hdr[0:]), int(le.Uint16(hdr[20:])) hasHeaders := rl&hbit != 0 // Clear any headers bit that could be set. rl &^= hbit dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { + if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { truncate(index) return gatherLost(lbuf - index), tombstones, errBadMsg } @@ -1978,9 +1981,9 @@ func (fs *fileStore) recoverMsgs() error { // We will treat this differently in case we have a recovery // that will expire alot of messages on startup. // Should only be called on startup. -func (fs *fileStore) expireMsgsOnRecover() { +func (fs *fileStore) expireMsgsOnRecover() error { if fs.state.Msgs == 0 { - return + return nil } var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge) @@ -1992,7 +1995,7 @@ func (fs *fileStore) expireMsgsOnRecover() { // usually taken care of by fs.removeMsgBlock() but we do not call that here. var last msgId - deleteEmptyBlock := func(mb *msgBlock) { + deleteEmptyBlock := func(mb *msgBlock) error { // If we are the last keep state to remember first/last sequence. // Do this part by hand since not deleting one by one. if mb == fs.lmb { @@ -2008,8 +2011,12 @@ func (fs *fileStore) expireMsgsOnRecover() { } return true }) - mb.dirtyCloseWithRemove(true) + err := mb.dirtyCloseWithRemove(true) + if isPermissionError(err) { + return err + } deleted++ + return nil } for _, mb := range fs.blks { @@ -2023,8 +2030,11 @@ func (fs *fileStore) expireMsgsOnRecover() { if mb.last.ts <= minAge { purged += mb.msgs bytes += mb.bytes - deleteEmptyBlock(mb) + err := deleteEmptyBlock(mb) mb.mu.Unlock() + if isPermissionError(err) { + return err + } continue } @@ -2148,6 +2158,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if purged > 0 { fs.dirty++ } + return nil } func copyMsgBlocks(src []*msgBlock) []*msgBlock { @@ -2315,8 +2326,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor fseq = lseq + 1 for _, subj := range subs { ss, _ := mb.fss.Find(stringToBytes(subj)) - if ss != nil && ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { + mb.recalculateForSubj(subj, ss) } if ss == nil || start > ss.Last || ss.First >= fseq { continue @@ -2445,8 +2456,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(bytesToString(bsubj), ss) } if sseq <= ss.First { update(ss) @@ -2616,10 +2627,6 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si // Always reset. ss.First, ss.Last, ss.Msgs = 0, 0, 0 - if filter == _EMPTY_ { - filter = fwcs - } - // We do need to figure out the first and last sequences. wc := subjectHasWildcard(filter) start, stop := uint32(math.MaxUint32), uint32(0) @@ -2749,8 +2756,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { mb.lsts = time.Now().UnixNano() mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { subj := string(bsubj) - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } oss := fss[subj] if oss.First == 0 { // New @@ -2940,8 +2947,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return } subj := bytesToString(bsubj) - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3228,8 +3235,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3467,6 +3474,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { dios <- struct{}{} if err != nil { + if isPermissionError(err) { + return nil, err + } mb.dirtyCloseWithRemove(true) return nil, fmt.Errorf("Error creating msg block file: %v", err) } @@ -3902,8 +3912,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { info.fblk = i } } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } mb.mu.Unlock() // Re-acquire fs lock @@ -4034,8 +4044,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() ss, ok := mb.fss.Find(stringToBytes(subj)) - if ok && ss != nil && ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { + mb.recalculateForSubj(subj, ss) } mb.mu.Unlock() if ss == nil { @@ -4364,12 +4374,12 @@ func (mb *msgBlock) compactWithFloor(floor uint64) { return } hdr := buf[index : index+msgHdrSize] - rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:]) + rl, slen := le.Uint32(hdr[0:]), int(le.Uint16(hdr[20:])) // Clear any headers bit that could be set. rl &^= hbit dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh || index+rl > lbuf { + if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { return } // Only need to process non-deleted messages. @@ -5473,16 +5483,23 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { <-dios tmpFD, err := os.OpenFile(tmpFN, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, defaultFilePerms) dios <- struct{}{} + if err != nil { return fmt.Errorf("failed to create temporary file: %w", err) } + errorCleanup := func(err error) error { + tmpFD.Close() + os.Remove(tmpFN) + return err + } + // The original buffer at this point is uncompressed, so we will now compress // it if needed. Note that if the selected algorithm is NoCompression, the // Compress function will just return the input buffer unmodified. cmpBuf, err := alg.Compress(origBuf) if err != nil { - return fmt.Errorf("failed to compress block: %w", err) + return errorCleanup(fmt.Errorf("failed to compress block: %w", err)) } // We only need to write out the metadata header if compression is enabled. @@ -5500,7 +5517,7 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { if mb.bek != nil && len(cmpBuf) > 0 { bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) if err != nil { - return err + return errorCleanup(err) } mb.bek = bek mb.bek.XORKeyStream(cmpBuf, cmpBuf) @@ -5508,11 +5525,6 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // Write the new block data (which might be compressed or encrypted) to the // temporary file. - errorCleanup := func(err error) error { - tmpFD.Close() - os.Remove(tmpFN) - return err - } if n, err := tmpFD.Write(cmpBuf); err != nil { return errorCleanup(fmt.Errorf("failed to write to temporary file: %w", err)) } else if n != len(cmpBuf) { @@ -6488,7 +6500,7 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store dlen := int(rl) - msgHdrSize slen := int(le.Uint16(hdr[20:])) // Simple sanity check. - if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) { + if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) || rl > rlBadThresh { return nil, errBadMsg } data := buf[msgHdrSize : msgHdrSize+dlen] @@ -7783,9 +7795,9 @@ func (mb *msgBlock) dirtyClose() { } // Should be called with lock held. -func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { +func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { if mb == nil { - return + return nil } // Stop cache expiration timer. if mb.ctmr != nil { @@ -7807,13 +7819,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { // Clear any tracking by subject if we are removing. mb.fss = nil if mb.mfn != _EMPTY_ { - os.Remove(mb.mfn) + err := os.Remove(mb.mfn) + if isPermissionError(err) { + return err + } mb.mfn = _EMPTY_ } if mb.kfn != _EMPTY_ { - os.Remove(mb.kfn) + err := os.Remove(mb.kfn) + if isPermissionError(err) { + return err + } } } + return nil } // Remove a seq from the fss and select new first. @@ -7836,24 +7855,14 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- - // Only one left. - if ss.Msgs == 1 { - if seq == ss.Last { - ss.Last = ss.First - } else { - ss.First = ss.Last - } - ss.firstNeedsUpdate = false - return - } - - // We can lazily calculate the first sequence when needed. + // We can lazily calculate the first/last sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalulate the first sequence for this subject in this block. +// Will recalculate the first and/or last sequence for this subject in this block. // Will avoid slower path message lookups and scan the cache directly instead. -func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { +func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { // Need to make sure messages are loaded. if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { @@ -7861,42 +7870,100 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si } } - // Mark first as updated. - ss.firstNeedsUpdate = false - - startSlot := int(startSeq - mb.cache.fseq) + startSlot := int(ss.First - mb.cache.fseq) + if startSlot < 0 { + startSlot = 0 + } if startSlot >= len(mb.cache.idx) { ss.First = ss.Last return - } else if startSlot < 0 { - startSlot = 0 + } + endSlot := int(ss.Last - mb.cache.fseq) + if endSlot < 0 { + endSlot = 0 + } + if endSlot >= len(mb.cache.idx) || startSlot > endSlot { + return } var le = binary.LittleEndian - for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ { - bi := mb.cache.idx[slot] &^ hbit - if bi == dbit { - // delete marker so skip. - continue + if ss.firstNeedsUpdate { + // Mark first as updated. + ss.firstNeedsUpdate = false + + fseq := ss.First + 1 + if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { + fseq = mbFseq } - li := int(bi) - mb.cache.off - if li >= len(mb.cache.buf) { - ss.First = ss.Last - return - } - buf := mb.cache.buf[li:] - hdr := buf[:msgHdrSize] - slen := int(le.Uint16(hdr[20:])) - if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { - seq := le.Uint64(hdr[4:]) - if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + for slot := startSlot; slot < len(mb.cache.idx); slot++ { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. continue } - ss.First = seq - if ss.Msgs == 1 { - ss.Last = seq + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + ss.First = ss.Last + return + } + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + continue + } + ss.First = seq + if ss.Msgs == 1 { + ss.Last = seq + ss.lastNeedsUpdate = false + return + } + // Skip the start slot ahead, if we need to recalculate last we can stop early. + startSlot = slot + break + } + } + } + if ss.lastNeedsUpdate { + // Mark last as updated. + ss.lastNeedsUpdate = false + + lseq := ss.Last - 1 + if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq { + lseq = mbLseq + } + for slot := endSlot; slot >= startSlot; slot-- { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. + continue + } + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + // Can't overwrite ss.Last, just skip. + return + } + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + continue + } + // Sequence should never be lower, but guard against it nonetheless. + if seq < ss.First { + seq = ss.First + } + ss.Last = seq + if ss.Msgs == 1 { + ss.First = seq + ss.firstNeedsUpdate = false + } + return } - return } } } @@ -8178,7 +8245,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) { for { select { case <-t.C: - fs.writeFullState() + err := fs.writeFullState() + if isPermissionError(err) && fs.srv != nil { + fs.warn("File system permission denied when flushing stream state, disabling JetStream: %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return + } + case <-qch: return } @@ -8386,7 +8461,11 @@ func (fs *fileStore) _writeFullState(force bool) error { // Protect with dios. <-dios err := os.WriteFile(fn, buf, defaultFilePerms) + // if file system is not writable isPermissionError is set to true dios <- struct{}{} + if isPermissionError(err) { + return err + } // Update dirty if successful. if err == nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/ipqueue.go b/vendor/github.com/nats-io/nats-server/v2/server/ipqueue.go index b26a749ed7..95bf27457e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/ipqueue.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/ipqueue.go @@ -190,14 +190,16 @@ func (q *ipQueue[T]) len() int { } // Empty the queue and consumes the notification signal if present. +// Returns the number of items that were drained from the queue. // Note that this could cause a reader go routine that has been // notified that there is something in the queue (reading from queue's `ch`) // may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`. -func (q *ipQueue[T]) drain() { +func (q *ipQueue[T]) drain() int { if q == nil { - return + return 0 } q.Lock() + olen := len(q.elts) if q.elts != nil { q.resetAndReturnToPool(&q.elts) q.elts, q.pos = nil, 0 @@ -209,6 +211,7 @@ func (q *ipQueue[T]) drain() { default: } q.Unlock() + return olen } // Since the length of the queue goes to 0 after a pop(), it is good to diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go index 2e606e6a6f..c1e709a19e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go @@ -2974,3 +2974,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) { cfg.Duplicates = 0 } } + +func (s *Server) handleWritePermissionError() { + //TODO Check if we should add s.jetStreamOOSPending in condition + if s.JetStreamEnabled() { + s.Errorf("File system permission denied while writing, disabling JetStream") + + go s.DisableJetStream() + + //TODO Send respective advisory if needed, same as in handleOutOfSpace + } +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go index de014e74b7..e7fb21c1a1 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go @@ -836,7 +836,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub limit := atomic.LoadInt64(&js.queueLimit) if pending >= int(limit) { s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending) - s.jsAPIRoutedReqs.drain() + drained := int64(s.jsAPIRoutedReqs.drain()) + atomic.AddInt64(&js.apiInflight, -drained) s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ TypedEvent: TypedEvent{ @@ -846,7 +847,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub }, Server: s.Name(), Domain: js.config.Domain, - Dropped: int64(pending), + Dropped: drained, }) } } @@ -864,8 +865,10 @@ func (s *Server) processJSAPIRoutedRequests() { for { select { case <-queue.ch: - reqs := queue.pop() - for _, r := range reqs { + // Only pop one item at a time here, otherwise if the system is recovering + // from queue buildup, then one worker will pull off all the tasks and the + // others will be starved of work. + for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() { client.pa = r.pa start := time.Now() r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg) @@ -874,7 +877,6 @@ func (s *Server) processJSAPIRoutedRequests() { } atomic.AddInt64(&js.apiInflight, -1) } - queue.recycle(&reqs) case <-s.quitCh: return } @@ -3416,7 +3418,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC Time: start, }, Stream: streamName, - Client: ci, + Client: ci.forAdvisory(), Domain: domain, }) @@ -3548,7 +3550,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC Start: start, End: end, Bytes: int64(total), - Client: ci, + Client: ci.forAdvisory(), Domain: domain, }) @@ -3681,7 +3683,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Accoun }, Stream: mset.name(), State: sr.State, - Client: ci, + Client: ci.forAdvisory(), Domain: s.getOpts().JetStreamDomain, }) @@ -3699,7 +3701,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Accoun Stream: mset.name(), Start: start, End: end, - Client: ci, + Client: ci.forAdvisory(), Domain: s.getOpts().JetStreamDomain, }) @@ -4263,9 +4265,17 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, return } + js.mu.RLock() + meta := cc.meta + js.mu.RUnlock() + + // Since these could wait on the Raft group lock, don't do so under the JS lock. + ourID := meta.ID() + groupLeader := meta.GroupLeader() + groupCreated := meta.Created() + js.mu.RLock() isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName) - ourID := cc.meta.ID() var rg *raftGroup var offline, isMember bool if ca != nil { @@ -4279,7 +4289,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, // Also capture if we think there is no meta leader. var isLeaderLess bool if !isLeader { - isLeaderLess = cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault + isLeaderLess = groupLeader == _EMPTY_ && time.Since(groupCreated) > lostQuorumIntervalDefault } js.mu.RUnlock() @@ -4489,7 +4499,7 @@ func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, sub Time: time.Now().UTC(), }, Server: s.Name(), - Client: ci, + Client: ci.forAdvisory(), Subject: subject, Request: request, Response: response, diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 8f08b1e502..57cec3873c 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -267,7 +267,12 @@ func (s *Server) JetStreamSnapshotMeta() error { return errNotLeader } - return meta.InstallSnapshot(js.metaSnapshot()) + snap, err := js.metaSnapshot() + if err != nil { + return err + } + + return meta.InstallSnapshot(snap) } func (s *Server) JetStreamStepdownStream(account, stream string) error { @@ -437,73 +442,6 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { return false } -// Restart the stream in question. -// Should only be called when the stream is known to be in a bad state. -func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { - js.mu.Lock() - s, cc := js.srv, js.cluster - if cc == nil { - js.mu.Unlock() - return - } - // Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy. - asa := cc.streams[acc.Name] - if asa == nil { - js.mu.Unlock() - return - } - sa := asa[csa.Config.Name] - if sa == nil { - js.mu.Unlock() - return - } - // Make sure to clear out the raft node if still present in the meta layer. - if rg := sa.Group; rg != nil && rg.node != nil { - if rg.node.State() != Closed { - rg.node.Stop() - } - rg.node = nil - } - sinceCreation := time.Since(sa.Created) - js.mu.Unlock() - - // Process stream assignment to recreate. - // Check that we have given system enough time to start us up. - // This will be longer than obvious, and matches consumer logic in case system very busy. - if sinceCreation < 10*time.Second { - s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v", - acc, csa.Config.Name, sinceCreation) - return - } - - js.processStreamAssignment(sa) - - // If we had consumers assigned to this server they will be present in the copy, csa. - // They also need to be processed. The csa consumers is a copy of only our consumers, - // those assigned to us, but the consumer assignment's there are direct from the meta - // layer to make this part much easier and avoid excessive lookups. - for _, cca := range csa.consumers { - if cca.deleted { - continue - } - // Need to look up original as well here to make sure node is nil. - js.mu.Lock() - ca := sa.consumers[cca.Name] - if ca != nil && ca.Group != nil { - // Make sure the node is stopped if still running. - if node := ca.Group.node; node != nil && node.State() != Closed { - node.Stop() - } - // Make sure node is wiped. - ca.Group.node = nil - } - js.mu.Unlock() - if ca != nil { - js.processConsumerAssignment(ca) - } - } -} - // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { @@ -529,7 +467,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { // First lookup stream and make sure its there. mset, err := acc.lookupStream(streamName) if err != nil { - js.restartStream(acc, sa) return false } @@ -554,8 +491,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName) node.Delete() mset.resetClusteredState(nil) - } else if node.State() == Closed { - js.restartStream(acc, sa) } } return false @@ -585,37 +520,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum node := ca.Group.node js.mu.RUnlock() - // When we try to restart we nil out the node if applicable - // and reprocess the consumer assignment. - restartConsumer := func() { - mset.mu.RLock() - accName, streamName := mset.acc.GetName(), mset.cfg.Name - mset.mu.RUnlock() - - js.mu.Lock() - deleted := ca.deleted - // Check that we have not just been created. - if !deleted && time.Since(ca.Created) < 10*time.Second { - s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v", - accName, streamName, consumer, time.Since(ca.Created)) - js.mu.Unlock() - return - } - // Make sure the node is stopped if still running. - if node != nil && node.State() != Closed { - node.Stop() - } - ca.Group.node = nil - js.mu.Unlock() - if !deleted { - js.processConsumerAssignment(ca) - } - } - // Check if not running at all. o := mset.lookupConsumer(consumer) if o == nil { - restartConsumer() return false } @@ -630,11 +537,12 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer) node.Delete() o.deleteWithoutAdvisory() - restartConsumer() - } else if node.State() == Closed { - // We have a consumer, and it should have a running node but it is closed. - o.stop() - restartConsumer() + + // When we try to restart we nil out the node and reprocess the consumer assignment. + js.mu.Lock() + ca.Group.node = nil + js.mu.Unlock() + js.processConsumerAssignment(ca) } } return false @@ -901,15 +809,17 @@ func (js *jetStream) server() *Server { // Will respond if we do not think we have a metacontroller leader. func (js *jetStream) isLeaderless() bool { js.mu.RLock() - defer js.mu.RUnlock() - cc := js.cluster if cc == nil || cc.meta == nil { + js.mu.RUnlock() return false } + meta := cc.meta + js.mu.RUnlock() + // If we don't have a leader. // Make sure we have been running for enough time. - if cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault { + if meta.GroupLeader() == _EMPTY_ && time.Since(meta.Created()) > lostQuorumIntervalDefault { return true } return false @@ -921,34 +831,38 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { return false } js.mu.RLock() - defer js.mu.RUnlock() - cc := js.cluster + started := js.started // If we are not a member we can not say.. if cc.meta == nil { + js.mu.RUnlock() return false } if !rg.isMember(cc.meta.ID()) { + js.mu.RUnlock() return false } // Single peer groups always have a leader if we are here. if rg.node == nil { + js.mu.RUnlock() return false } + node := rg.node + js.mu.RUnlock() // If we don't have a leader. - if rg.node.GroupLeader() == _EMPTY_ { + if node.GroupLeader() == _EMPTY_ { // Threshold for jetstream startup. const startupThreshold = 10 * time.Second - if rg.node.HadPreviousLeader() { + if node.HadPreviousLeader() { // Make sure we have been running long enough to intelligently determine this. - if time.Since(js.started) > startupThreshold { + if time.Since(started) > startupThreshold { return true } } // Make sure we have been running for enough time. - if time.Since(rg.node.Created()) > lostQuorumIntervalDefault { + if time.Since(node.Created()) > lostQuorumIntervalDefault { return true } } @@ -1334,7 +1248,10 @@ func (js *jetStream) monitorCluster() { } // For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact. if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() { - if err := n.InstallSnapshot(js.metaSnapshot()); err == nil { + snap, err := js.metaSnapshot() + if err != nil { + s.Warnf("Error generating JetStream cluster snapshot: %v", err) + } else if err = n.InstallSnapshot(snap); err == nil { lastSnapTime = time.Now() } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Error snapshotting JetStream cluster state: %v", err) @@ -1528,7 +1445,7 @@ func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConf return StreamConfig{}, false } -func (js *jetStream) metaSnapshot() []byte { +func (js *jetStream) metaSnapshot() ([]byte, error) { start := time.Now() js.mu.RLock() s := js.srv @@ -1568,16 +1485,22 @@ func (js *jetStream) metaSnapshot() []byte { if len(streams) == 0 { js.mu.RUnlock() - return nil + return nil, nil } // Track how long it took to marshal the JSON mstart := time.Now() - b, _ := json.Marshal(streams) + b, err := json.Marshal(streams) mend := time.Since(mstart) js.mu.RUnlock() + // Must not be possible for a JSON marshaling error to result + // in an empty snapshot. + if err != nil { + return nil, err + } + // Track how long it took to compress the JSON cstart := time.Now() snap := s2.Encode(nil, b) @@ -1587,7 +1510,7 @@ func (js *jetStream) metaSnapshot() []byte { s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, s2: %.3fs, uncompressed: %d, compressed: %d)", took.Seconds(), nsa, nca, mend.Seconds(), cend.Seconds(), len(b), len(snap)) } - return snap + return snap, nil } func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error { @@ -2411,7 +2334,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps compactInterval = 2 * time.Minute compactSizeMin = 8 * 1024 * 1024 compactNumMin = 65536 - minSnapDelta = 10 * time.Second ) // Spread these out for large numbers on server restart. @@ -2435,16 +2357,15 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // a complete and detailed state which could be costly in terms of memory, cpu and GC. // This only entails how many messages, and the first and last sequence of the stream. // This is all that is needed to detect a change, and we can get this from FilteredState() - // with and empty filter. + // with an empty filter. var lastState SimpleState - var lastSnapTime time.Time // Don't allow the upper layer to install snapshots until we have // fully recovered from disk. isRecovering := true doSnapshot := func() { - if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta { + if mset == nil || isRecovering || isRestore { return } @@ -2462,7 +2383,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil { - lastState, lastSnapTime = curState, time.Now() + lastState = curState } else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning { s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } @@ -2541,10 +2462,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps for { select { case <-s.quitCh: + // Server shutting down, but we might receive this before qch, so try to snapshot. + doSnapshot() return case <-mqch: return case <-qch: + // Clean signal from shutdown routine so do best effort attempt to snapshot. + doSnapshot() return case <-aq.ch: var ne, nb uint64 @@ -2603,12 +2528,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Check about snapshotting // If we have at least min entries to compact, go ahead and try to snapshot/compact. if ne >= compactNumMin || nb > compactSizeMin || mset.getCLFS() > pclfs { - // We want to make sure we do not short circuit if transistioning from no clfs. - if pclfs == 0 { - // This is always false by default. - lastState.firstNeedsUpdate = true - lastSnapTime = time.Time{} - } doSnapshot() } @@ -2711,8 +2630,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // keep stream assignment current sa = mset.streamAssignment() - // keep peer list up to date with config - js.checkPeers(mset.raftGroup()) // We get this when we have a new stream assignment caused by an update. // We want to know if we are migrating. if migrating := mset.isMigrating(); migrating { @@ -2800,7 +2717,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Check if we have a quorom. if current >= neededCurrent { s.Noticef("Transfer of stream leader for '%s > %s' to '%s'", accName, sa.Config.Name, newLeader) - n.UpdateKnownPeers(newPeers) + n.ProposeKnownPeers(newPeers) n.StepDown(newLeaderPeer) } } @@ -3090,8 +3007,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 { // Skip and update our lseq. last := mset.store.SkipMsg() + mset.mu.Lock() mset.setLastSeq(last) mset.clearAllPreAcks(last) + mset.mu.Unlock() continue } @@ -3318,22 +3237,6 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo { return replicas } -// Will check our node peers and see if we should remove a peer. -func (js *jetStream) checkPeers(rg *raftGroup) { - js.mu.Lock() - defer js.mu.Unlock() - - // FIXME(dlc) - Single replicas? - if rg == nil || rg.node == nil { - return - } - for _, peer := range rg.node.Peers() { - if !rg.isMember(peer.ID) { - rg.node.ProposeRemovePeer(peer.ID) - } - } -} - // Process a leader change for the clustered stream. func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { if mset == nil { @@ -3362,8 +3265,6 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { if isLeader { s.Noticef("JetStream cluster new stream leader for '%s > %s'", account, streamName) s.sendStreamLeaderElectAdvisory(mset) - // Check for peer removal and process here if needed. - js.checkPeers(sa.Group) mset.checkAllowMsgCompress(peers) } else { // We are stepping down. @@ -3579,7 +3480,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { js.processClusterCreateStream(acc, sa) } else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { // We have one here even though we are not a member. This can happen on re-assignment. - s.removeStream(ourID, mset, sa) + s.removeStream(mset, sa) } // If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected. @@ -3667,13 +3568,13 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { js.processClusterUpdateStream(acc, osa, sa) } else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { // We have one here even though we are not a member. This can happen on re-assignment. - s.removeStream(ourID, mset, sa) + s.removeStream(mset, sa) } } -// Common function to remove ourself from this server. +// Common function to remove ourselves from this server. // This can happen on re-assignment, move, etc -func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) { +func (s *Server) removeStream(mset *stream, nsa *streamAssignment) { if mset == nil { return } @@ -3683,7 +3584,6 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) if node.Leader() { node.StepDown(nsa.Group.Preferred) } - node.ProposeRemovePeer(ourID) // shutdown monitor by shutting down raft. node.Delete() } @@ -4472,10 +4372,11 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } else { // If we are clustered update the known peers. js.mu.RLock() - if node := rg.node; node != nil { + node := rg.node + js.mu.RUnlock() + if node != nil { node.UpdateKnownPeers(ca.Group.Peers) } - js.mu.RUnlock() } // Check if we already have this consumer running. @@ -4943,8 +4844,12 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { for { select { case <-s.quitCh: + // Server shutting down, but we might receive this before qch, so try to snapshot. + doSnapshot(false) return case <-qch: + // Clean signal from shutdown routine so do best effort attempt to snapshot. + doSnapshot(false) return case <-aq.ch: ces := aq.pop() @@ -5009,8 +4914,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // We get this when we have a new consumer assignment caused by an update. // We want to know if we are migrating. rg := o.raftGroup() - // keep peer list up to date with config - js.checkPeers(rg) // If we are migrating, monitor for the new peers to be caught up. replicas, err := o.replica() if err != nil { @@ -5327,8 +5230,6 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err if isLeader { s.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.serviceAccount(), streamName, consumerName) s.sendConsumerLeaderElectAdvisory(o) - // Check for peer removal and process here if needed. - js.checkPeers(ca.Group) } else { // We are stepping down. // Make sure if we are doing so because we have lost quorum that we send the appropriate advisories. @@ -7267,23 +7168,29 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset } func encodeAddStreamAssignment(sa *streamAssignment) []byte { + csa := *sa + csa.Client = csa.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(assignStreamOp)) - json.NewEncoder(&bb).Encode(sa) + json.NewEncoder(&bb).Encode(csa) return bb.Bytes() } func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { + csa := *sa + csa.Client = csa.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(updateStreamOp)) - json.NewEncoder(&bb).Encode(sa) + json.NewEncoder(&bb).Encode(csa) return bb.Bytes() } func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { + csa := *sa + csa.Client = csa.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(removeStreamOp)) - json.NewEncoder(&bb).Encode(sa) + json.NewEncoder(&bb).Encode(csa) return bb.Bytes() } @@ -7671,16 +7578,20 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { + cca := *ca + cca.Client = cca.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(assignConsumerOp)) - json.NewEncoder(&bb).Encode(ca) + json.NewEncoder(&bb).Encode(cca) return bb.Bytes() } func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { + cca := *ca + cca.Client = cca.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(removeConsumerOp)) - json.NewEncoder(&bb).Encode(ca) + json.NewEncoder(&bb).Encode(cca) return bb.Bytes() } @@ -7691,25 +7602,21 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { } func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { - b, err := json.Marshal(ca) - if err != nil { - return nil - } - // TODO(dlc) - Streaming better approach here probably. + cca := *ca + cca.Client = cca.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(assignCompressedConsumerOp)) - bb.Write(s2.Encode(nil, b)) + s2e := s2.NewWriter(&bb) + json.NewEncoder(s2e).Encode(cca) + s2e.Close() return bb.Bytes() } func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) { var ca consumerAssignment - js, err := s2.Decode(nil, buf) - if err != nil { - return nil, err - } - err = json.Unmarshal(js, &ca) - return &ca, err + bb := bytes.NewBuffer(buf) + s2d := s2.NewReader(bb) + return &ca, json.NewDecoder(s2d).Decode(&ca) } var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg") @@ -8654,6 +8561,8 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) { return 0, err } + mset.mu.Lock() + defer mset.mu.Unlock() // Update our lseq. mset.setLastSeq(seq) @@ -8661,11 +8570,9 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) { if len(hdr) > 0 { if msgId := getMsgId(hdr); msgId != _EMPTY_ { if !ddloaded { - mset.mu.Lock() mset.rebuildDedupe() - mset.mu.Unlock() } - mset.storeMsgId(&ddentry{msgId, seq, ts}) + mset.storeMsgIdLocked(&ddentry{msgId, seq, ts}) } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go index 8302fcc404..8c099c7ad8 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go @@ -18,13 +18,22 @@ import ( "time" ) -func (s *Server) publishAdvisory(acc *Account, subject string, adv any) { +// publishAdvisory sends the given advisory into the account. Returns true if +// it was sent, false if not (i.e. due to lack of interest or a marshal error). +func (s *Server) publishAdvisory(acc *Account, subject string, adv any) bool { if acc == nil { acc = s.SystemAccount() if acc == nil { - return + return false } } + + // If there is no one listening for this advisory then save ourselves the effort + // and don't bother encoding the JSON or sending it. + if sl := acc.sl; (sl != nil && !sl.HasInterest(subject)) && !s.hasGatewayInterest(acc.Name, subject) { + return false + } + ej, err := json.Marshal(adv) if err == nil { err = s.sendInternalAccountMsg(acc, subject, ej) @@ -34,6 +43,7 @@ func (s *Server) publishAdvisory(acc *Account, subject string, adv any) { } else { s.Warnf("Advisory could not be serialized for account %q: %v", acc.Name, err) } + return err == nil } // JSAPIAudit is an advisory about administrative actions taken on JetStream diff --git a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go index 26a3f6ec3d..6cd4b3c02f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go @@ -2247,8 +2247,16 @@ func (c *client) sendLeafNodeSubUpdate(key string, n int32) { checkPerms = false } } - if checkPerms && !c.canSubscribe(key) { - return + if checkPerms { + var subject string + if sep := strings.IndexByte(key, ' '); sep != -1 { + subject = key[:sep] + } else { + subject = key + } + if !c.canSubscribe(subject) { + return + } } } // If we are here we can send over to the other side. @@ -2435,7 +2443,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { } key := bytesToString(sub.sid) osub := c.subs[key] - updateGWs := false if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -2446,7 +2453,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { c.sendErr("Invalid Subscription") return nil } - updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. delta = sub.qw - atomic.LoadInt32(&osub.qw) @@ -2469,7 +2475,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { if !spoke { // If we are routing add to the route map for the associated account. srv.updateRouteSubscriptionMap(acc, sub, delta) - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } } @@ -2511,27 +2517,27 @@ func (c *client) processLeafUnsub(arg []byte) error { return nil } - updateGWs := false spoke := c.isSpokeLeafNode() // We store local subs by account and subject and optionally queue name. // LS- will have the arg exactly as the key. sub, ok := c.subs[string(arg)] + if !ok { + // If not found, don't try to update routes/gws/leaf nodes. + c.mu.Unlock() + return nil + } delta := int32(1) - if ok && len(sub.queue) > 0 { + if len(sub.queue) > 0 { delta = sub.qw } c.mu.Unlock() - if ok { - c.unsubscribe(acc, sub, true, true) - updateGWs = srv.gateway.enabled - } - + c.unsubscribe(acc, sub, true, true) if !spoke { // If we are routing subtract from the route map for the associated account. srv.updateRouteSubscriptionMap(acc, sub, -delta) // Gateways - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, -delta) } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index e2ca1cae29..350cfa388e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -143,8 +143,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int return ErrMaxBytes } // If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room. - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } sm, ok := ms.msgs[ss.First] if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) { @@ -430,8 +430,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje var totalSkipped uint64 // We will track start and end sequences as we go. ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate { - ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + if fss.firstNeedsUpdate || fss.lastNeedsUpdate { + ms.recalculateForSubj(bytesToString(subj), fss) } if sseq <= fss.First { update(fss) @@ -585,8 +585,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { fss := make(map[string]SimpleState) ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) { subjs := string(subj) - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subjs, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subjs, ss) } oss := fss[subjs] if oss.First == 0 { // New @@ -675,8 +675,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo var totalSkipped uint64 // We will track start and end sequences as we go. IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate { - ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + if fss.firstNeedsUpdate || fss.lastNeedsUpdate { + ms.recalculateForSubj(bytesToString(subj), fss) } if sseq <= fss.First { update(fss) @@ -793,8 +793,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) { return } for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs { - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } if !ms.removeMsg(ss.First, false) { break @@ -1009,8 +1009,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { if sm := ms.msgs[seq]; sm != nil { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) purged++ - delete(ms.msgs, seq) ms.removeSeqPerSubject(sm.subj, seq) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, seq) } } if purged > ms.state.Msgs { @@ -1098,8 +1099,9 @@ func (ms *memStore) Truncate(seq uint64) error { if sm := ms.msgs[i]; sm != nil { purged++ bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - delete(ms.msgs, i) ms.removeSeqPerSubject(sm.subj, i) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, i) } } // Reset last. @@ -1265,8 +1267,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store if !ok { continue } - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } if ss.First < fseq { fseq = ss.First @@ -1360,34 +1362,47 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // If we know we only have 1 msg left don't need to search for next first. - if ss.Msgs == 1 { - if seq == ss.Last { - ss.Last = ss.First - } else { - ss.First = ss.Last - } - ss.firstNeedsUpdate = false - } else { - ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate - } + // We can lazily calculate the first/last sequence when needed. + ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalculate the first sequence for this subject in this block. +// Will recalculate the first and/or last sequence for this subject. // Lock should be held. -func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { - tseq := startSeq + 1 - if tseq < ms.state.FirstSeq { - tseq = ms.state.FirstSeq - } - for ; tseq <= ss.Last; tseq++ { - if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { - ss.First = tseq - if ss.Msgs == 1 { - ss.Last = tseq +func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) { + if ss.firstNeedsUpdate { + tseq := ss.First + 1 + if tseq < ms.state.FirstSeq { + tseq = ms.state.FirstSeq + } + for ; tseq <= ss.Last; tseq++ { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { + ss.First = tseq + ss.firstNeedsUpdate = false + if ss.Msgs == 1 { + ss.Last = tseq + ss.lastNeedsUpdate = false + return + } + break + } + } + } + if ss.lastNeedsUpdate { + tseq := ss.Last - 1 + if tseq > ms.state.LastSeq { + tseq = ms.state.LastSeq + } + for ; tseq >= ss.First; tseq-- { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { + ss.Last = tseq + ss.lastNeedsUpdate = false + if ss.Msgs == 1 { + ss.First = tseq + ss.firstNeedsUpdate = false + } + return } - ss.firstNeedsUpdate = false - return } } } @@ -1403,7 +1418,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - delete(ms.msgs, seq) if ms.state.Msgs > 0 { ms.state.Msgs-- if ss > ms.state.Bytes { @@ -1428,6 +1442,8 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { // Remove any per subject tracking. ms.removeSeqPerSubject(sm.subj, seq) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, seq) if ms.scb != nil { // We do not want to hold any locks here. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/vendor/github.com/nats-io/nats-server/v2/server/raft.go index 64d8e4df3c..427e8ce677 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -61,6 +61,7 @@ type RaftNode interface { ID() string Group() string Peers() []*Peer + ProposeKnownPeers(knownPeers []string) UpdateKnownPeers(knownPeers []string) ProposeAddPeer(peer string) error ProposeRemovePeer(peer string) error @@ -1326,6 +1327,12 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { return false } + if n.paused && n.hcommit > n.commit { + // We're currently paused, waiting to be resumed to apply pending commits. + n.debug("Not current, waiting to resume applies commit=%d, hcommit=%d", n.commit, n.hcommit) + return false + } + if n.commit == n.applied { // At this point if we are current, we can return saying so. clearBehindState() @@ -1556,14 +1563,12 @@ func (n *raft) ID() string { if n == nil { return _EMPTY_ } - n.RLock() - defer n.RUnlock() + // Lock not needed as n.id is never changed after creation. return n.id } func (n *raft) Group() string { - n.RLock() - defer n.RUnlock() + // Lock not needed as n.group is never changed after creation. return n.group } @@ -1588,19 +1593,23 @@ func (n *raft) Peers() []*Peer { return peers } +// Update and propose our known set of peers. +func (n *raft) ProposeKnownPeers(knownPeers []string) { + // If we are the leader update and send this update out. + if n.State() != Leader { + return + } + n.UpdateKnownPeers(knownPeers) + n.sendPeerState() +} + // Update our known set of peers. func (n *raft) UpdateKnownPeers(knownPeers []string) { n.Lock() // Process like peer state update. ps := &peerState{knownPeers, len(knownPeers), n.extSt} n.processPeerState(ps) - isLeader := n.State() == Leader n.Unlock() - - // If we are the leader send this update out as well. - if isLeader { - n.sendPeerState() - } } // ApplyQ returns the apply queue that new commits will be sent to for the @@ -1615,8 +1624,7 @@ func (n *raft) LeadChangeC() <-chan bool { return n.leadc } func (n *raft) QuitC() <-chan struct{} { return n.quit } func (n *raft) Created() time.Time { - n.RLock() - defer n.RUnlock() + // Lock not needed as n.created is never changed after creation. return n.created } @@ -1840,7 +1848,7 @@ runner: // just will remove them from the central monitoring map queues := []interface { unregister() - drain() + drain() int }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} for _, q := range queues { q.drain() @@ -3853,6 +3861,10 @@ func (n *raft) setWriteErrLocked(err error) { n.error("Critical write error: %v", err) n.werr = err + if isPermissionError(err) { + go n.s.handleWritePermissionError() + } + if isOutOfSpaceErr(err) { // For now since this can be happening all under the covers, we will call up and disable JetStream. go n.s.handleOutOfSpace(nil) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/route.go b/vendor/github.com/nats-io/nats-server/v2/server/route.go index 0c455547c9..a865122e61 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/route.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/route.go @@ -1348,8 +1348,6 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { return nil } - updateGWs := false - _keya := [128]byte{} _key := _keya[:0] @@ -1373,19 +1371,21 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { if ok { delete(c.subs, key) acc.sl.Remove(sub) - updateGWs = srv.gateway.enabled if len(sub.queue) > 0 { delta = sub.qw } } c.mu.Unlock() - if updateGWs { - srv.gatewayUpdateSubInterest(accountName, sub, -delta) - } + // Update gateways and leaf nodes only if the subscription was found. + if ok { + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(accountName, sub, -delta) + } - // Now check on leafnode updates. - acc.updateLeafNodes(sub, -delta) + // Now check on leafnode updates. + acc.updateLeafNodes(sub, -delta) + } if c.opts.Verbose { c.sendOK() @@ -1600,7 +1600,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { // We use the sub.sid for the key of the c.subs map. key := bytesToString(sub.sid) osub := c.subs[key] - updateGWs := false if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1611,7 +1610,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { c.sendErr("Invalid Subscription") return nil } - updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. delta = sub.qw - atomic.LoadInt32(&osub.qw) @@ -1620,7 +1618,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { } c.mu.Unlock() - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/store.go b/vendor/github.com/nats-io/nats-server/v2/server/store.go index 72e039816e..2d72f69474 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/store.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/store.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "os" "strings" "time" "unsafe" @@ -166,6 +167,8 @@ type SimpleState struct { // Internal usage for when the first needs to be updated before use. firstNeedsUpdate bool + // Internal usage for when the last needs to be updated before use. + lastNeedsUpdate bool } // LostStreamData indicates msgs that have been lost. @@ -778,3 +781,7 @@ func copyString(s string) string { copy(b, s) return bytesToString(b) } + +func isPermissionError(err error) bool { + return err != nil && os.IsPermission(err) +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index a3a7c8fdc7..a2883631d4 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -1039,10 +1039,10 @@ func (mset *stream) lastSeq() uint64 { return mset.lseq } +// Set last seq. +// Write lock should be held. func (mset *stream) setLastSeq(lseq uint64) { - mset.mu.Lock() mset.lseq = lseq - mset.mu.Unlock() } func (mset *stream) sendCreateAdvisory() { @@ -2051,11 +2051,16 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err store.FastState(&state) fseq, lseq := state.FirstSeq, state.LastSeq + mset.mu.Lock() // Check if our last has moved past what our original last sequence was, if so reset. if lseq > mlseq { mset.setLastSeq(lseq) } + // Clear any pending acks below first seq. + mset.clearAllPreAcksBelowFloor(fseq) + mset.mu.Unlock() + // Purge consumers. // Check for filtered purge. if preq != nil && preq.Subject != _EMPTY_ { @@ -2102,7 +2107,14 @@ func (mset *stream) deleteMsg(seq uint64) (bool, error) { if mset.closed.Load() { return false, errStreamClosed } - return mset.store.RemoveMsg(seq) + removed, err := mset.store.RemoveMsg(seq) + if err != nil { + return removed, err + } + mset.mu.Lock() + mset.clearAllPreAcks(seq) + mset.mu.Unlock() + return removed, err } // EraseMsg will securely remove a message and rewrite the data with random data. @@ -2110,7 +2122,14 @@ func (mset *stream) eraseMsg(seq uint64) (bool, error) { if mset.closed.Load() { return false, errStreamClosed } - return mset.store.EraseMsg(seq) + removed, err := mset.store.EraseMsg(seq) + if err != nil { + return removed, err + } + mset.mu.Lock() + mset.clearAllPreAcks(seq) + mset.mu.Unlock() + return removed, err } // Are we a mirror? @@ -4000,15 +4019,8 @@ func (mset *stream) purgeMsgIds() { } } -// storeMsgId will store the message id for duplicate detection. -func (mset *stream) storeMsgId(dde *ddentry) { - mset.mu.Lock() - defer mset.mu.Unlock() - mset.storeMsgIdLocked(dde) -} - // storeMsgIdLocked will store the message id for duplicate detection. -// Lock should he held. +// Lock should be held. func (mset *stream) storeMsgIdLocked(dde *ddentry) { if mset.ddmap == nil { mset.ddmap = make(map[string]*ddentry) @@ -4628,6 +4640,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } if err != nil { + if isPermissionError(err) { + mset.mu.Unlock() + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + mset.srv.DisableJetStream() + mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err) + return err + } // If we did not succeed put those values back and increment clfs in case we are clustered. var state StreamState mset.store.FastState(&state) diff --git a/vendor/modules.txt b/vendor/modules.txt index 145d304cc3..80d60b3067 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -984,7 +984,7 @@ github.com/munnerz/goautoneg # github.com/nats-io/jwt/v2 v2.7.3 ## explicit; go 1.22 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.10.24 +# github.com/nats-io/nats-server/v2 v2.10.25 ## explicit; go 1.22 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/fastrand @@ -2208,7 +2208,7 @@ golang.org/x/text/transform golang.org/x/text/unicode/bidi golang.org/x/text/unicode/norm golang.org/x/text/width -# golang.org/x/time v0.8.0 +# golang.org/x/time v0.9.0 ## explicit; go 1.18 golang.org/x/time/rate # golang.org/x/tools v0.28.0