From a85e853365fb2af70be56acbb63d5600121498d5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 12 Aug 2025 19:53:59 +0000 Subject: [PATCH] build(deps): bump github.com/nats-io/nats-server/v2 Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.11.6 to 2.11.7. - [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml) - [Commits](https://github.com/nats-io/nats-server/compare/v2.11.6...v2.11.7) --- updated-dependencies: - dependency-name: github.com/nats-io/nats-server/v2 dependency-version: 2.11.7 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- .../nats-io/nats-server/v2/server/client.go | 2 +- .../nats-io/nats-server/v2/server/const.go | 2 +- .../nats-io/nats-server/v2/server/consumer.go | 158 +++++- .../nats-io/nats-server/v2/server/errors.json | 10 + .../nats-io/nats-server/v2/server/events.go | 7 +- .../nats-server/v2/server/filestore.go | 527 ++++++++++-------- .../nats-server/v2/server/jetstream.go | 27 +- .../v2/server/jetstream_cluster.go | 24 +- .../v2/server/jetstream_errors_generated.go | 14 + .../nats-io/nats-server/v2/server/jwt.go | 11 +- .../nats-io/nats-server/v2/server/memstore.go | 74 ++- .../nats-io/nats-server/v2/server/monitor.go | 345 ++++++------ .../nats-io/nats-server/v2/server/opts.go | 9 +- .../nats-io/nats-server/v2/server/raft.go | 145 ++--- .../nats-io/nats-server/v2/server/sdm.go | 11 +- .../nats-io/nats-server/v2/server/server.go | 14 +- .../nats-io/nats-server/v2/server/store.go | 2 - .../nats-io/nats-server/v2/server/stream.go | 22 +- .../nats-io/nats-server/v2/server/sublist.go | 6 + .../nats-io/nats-server/v2/server/thw/thw.go | 46 +- vendor/modules.txt | 2 +- 23 files changed, 886 insertions(+), 578 deletions(-) diff --git a/go.mod b/go.mod index 0ba51bf3cf..284dfadf6c 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.11.6 + github.com/nats-io/nats-server/v2 v2.11.7 github.com/nats-io/nats.go v1.43.0 github.com/oklog/run v1.2.0 github.com/olekukonko/tablewriter v1.0.8 diff --git a/go.sum b/go.sum index 28c0879525..5ef738ebf8 100644 --- a/go.sum +++ b/go.sum @@ -821,8 +821,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI= github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats-server/v2 v2.11.6 h1:4VXRjbTUFKEB+7UoaKL3F5Y83xC7MxPoIONOnGgpkHw= -github.com/nats-io/nats-server/v2 v2.11.6/go.mod h1:2xoztlcb4lDL5Blh1/BiukkKELXvKQ5Vy29FPVRBUYs= +github.com/nats-io/nats-server/v2 v2.11.7 h1:lINWQ/Hb3cnaoHmWTjj/7WppZnaSh9C/1cD//nHCbms= +github.com/nats-io/nats-server/v2 v2.11.7/go.mod h1:DchDPVzAsAPqhqm7VLedX0L7hjnV/SYtlmsl9F8U53s= github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug= github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go index caf3f91532..8accdb2742 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -4360,7 +4360,7 @@ func sliceHeader(key string, hdr []byte) []byte { if len(hdr) == 0 { return nil } - index := bytes.Index(hdr, stringToBytes(key)) + index := bytes.Index(hdr, stringToBytes(key+":")) hdrLen := len(hdr) // Check that we have enough characters, this will handle the -1 case of the key not // being found and will also handle not having enough characters for trailing CRLF. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index bd46d8a4e0..1896ba575b 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -58,7 +58,7 @@ func init() { const ( // VERSION is the current version for the server. - VERSION = "2.11.6" + VERSION = "2.11.7" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index 5450449584..d15b6c46c7 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -437,7 +437,8 @@ type consumer struct { rdqi avl.SequenceSet rdc map[uint64]uint64 replies map[uint64]string - pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum. + pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum. + waitingDeliveries map[string]*waitingDelivery // (Optional) request timeout messages that need to wait for replicated deliveries first. maxdc uint64 waiting *waitQueue cfg ConsumerConfig @@ -819,6 +820,9 @@ func checkConsumerCfg( } if config.PriorityPolicy != PriorityNone { + if config.DeliverSubject != "" { + return NewJSConsumerPushWithPriorityGroupError() + } if len(config.PriorityGroups) == 0 { return NewJSConsumerPriorityPolicyWithoutGroupError() } @@ -1846,7 +1850,7 @@ func (o *consumer) deleteNotActive() { } else { // Pull mode. elapsed := time.Since(o.waiting.last) - if elapsed <= o.cfg.InactiveThreshold { + if elapsed < o.dthresh { // These need to keep firing so reset but use delta. if o.dtmr != nil { o.dtmr.Reset(o.dthresh - elapsed) @@ -1866,6 +1870,43 @@ func (o *consumer) deleteNotActive() { o.mu.Unlock() return } + + // We now know we have no waiting requests, and our last request was long ago. + // However, based on AckWait the consumer could still be actively processing, + // even if we haven't been informed if there were no acks in the meantime. + // We must wait for the message that expires last and start counting down the + // inactive threshold from there. + now := time.Now().UnixNano() + l := len(o.cfg.BackOff) + var delay time.Duration + var ackWait time.Duration + for _, p := range o.pending { + if l == 0 { + ackWait = o.ackWait(0) + } else { + bi := int(o.rdc[p.Sequence]) + if bi < 0 { + bi = 0 + } else if bi >= l { + bi = l - 1 + } + ackWait = o.ackWait(o.cfg.BackOff[bi]) + } + if ts := p.Timestamp + ackWait.Nanoseconds() + o.dthresh.Nanoseconds(); ts > now { + delay = max(delay, time.Duration(ts-now)) + } + } + // We'll wait for the latest time we expect an ack, plus the inactive threshold. + // Acknowledging a message will reset this back down to just the inactive threshold. + if delay > 0 { + if o.dtmr != nil { + o.dtmr.Reset(delay) + } else { + o.dtmr = time.AfterFunc(delay, o.deleteNotActive) + } + o.mu.Unlock() + return + } } s, js := o.mset.srv, o.srv.js.Load() @@ -2540,11 +2581,23 @@ func (o *consumer) addAckReply(sseq uint64, reply string) { // Used to remember messages that need to be sent for a replicated consumer, after delivered quorum. // Lock should be held. func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) { - // Is not explicitly limited in size, but will at maximum hold maximum ack pending. + // Is not explicitly limited in size, but will at most hold maximum ack pending. if o.pendingDeliveries == nil { o.pendingDeliveries = make(map[uint64]*jsPubMsg) } o.pendingDeliveries[pmsg.seq] = pmsg + + // Is not explicitly limited in size, but will at most hold maximum waiting requests. + if o.waitingDeliveries == nil { + o.waitingDeliveries = make(map[string]*waitingDelivery) + } + if wd, ok := o.waitingDeliveries[pmsg.dsubj]; ok { + wd.seq = pmsg.seq + } else { + wd := wdPool.Get().(*waitingDelivery) + wd.seq = pmsg.seq + o.waitingDeliveries[pmsg.dsubj] = wd + } } // Lock should be held. @@ -3446,6 +3499,28 @@ func (wr *waitingRequest) recycle() { } } +// Represents an (optional) request timeout that's sent after waiting for replicated deliveries. +type waitingDelivery struct { + seq uint64 + pn int // Pending messages. + pb int // Pending bytes. +} + +// sync.Pool for waiting deliveries. +var wdPool = sync.Pool{ + New: func() any { + return new(waitingDelivery) + }, +} + +// Force a recycle. +func (wd *waitingDelivery) recycle() { + if wd != nil { + wd.seq, wd.pn, wd.pb = 0, 0, 0 + wdPool.Put(wd) + } +} + // waiting queue for requests that are waiting for new messages to arrive. type waitQueue struct { n, max int @@ -3721,8 +3796,19 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest { } } else { // We do check for expiration in `processWaiting`, but it is possible to hit the expiry here, and not there. - hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) - o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + rdWait := o.replicateDeliveries() + if rdWait { + // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately. + if wd, ok := o.waitingDeliveries[wr.reply]; !ok { + rdWait = false + } else { + wd.pn, wd.pb = wr.n, wr.b + } + } + if !rdWait { + hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + } o.waiting.removeCurrent() if o.node != nil { o.removeClusterPendingRequest(wr.reply) @@ -4187,8 +4273,19 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { for wr := wq.head; wr != nil; { // Check expiration. if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) { - hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) - o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + rdWait := o.replicateDeliveries() + if rdWait { + // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately. + if wd, ok := o.waitingDeliveries[wr.reply]; !ok { + rdWait = false + } else { + wd.pn, wd.pb = wr.n, wr.b + } + } + if !rdWait { + hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + } wr = remove(pre, wr) continue } @@ -4368,16 +4465,19 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { for { select { case <-o.ackMsgs.ch: + // If we have an inactiveThreshold set, mark our activity. + // Do this before processing acks, otherwise we might race if there are no pending messages + // anymore and the inactivity threshold kicks in before we're able to mark activity. + if hasInactiveThresh { + o.suppressDeletion() + } + acks := o.ackMsgs.pop() for _, ack := range acks { o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg) ack.returnToPool() } o.ackMsgs.recycle(&acks) - // If we have an inactiveThreshold set, mark our activity. - if hasInactiveThresh { - o.suppressDeletion() - } case <-ticker.C: o.checkAckFloor() case <-qch: @@ -4425,8 +4525,11 @@ func (o *consumer) suppressDeletion() { // if dtmr is not nil we have started the countdown, simply reset to threshold. o.dtmr.Reset(o.dthresh) } else if o.isPullMode() && o.waiting != nil { - // Pull mode always has timer running, just update last on waiting queue. + // Pull mode always has timer running, update last on waiting queue. o.waiting.last = time.Now() + if o.dtmr != nil { + o.dtmr.Reset(o.dthresh) + } } } @@ -4485,7 +4588,6 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { delay time.Duration sz int wrn, wrb int - wrNoWait bool ) o.mu.Lock() @@ -4564,7 +4666,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { if o.isPushMode() { dsubj = o.dsubj } else if wr := o.nextWaiting(sz); wr != nil { - wrn, wrb, wrNoWait = wr.n, wr.b, wr.noWait + wrn, wrb = wr.n, wr.b dsubj = wr.reply if o.cfg.PriorityPolicy == PriorityPinnedClient { // FIXME(jrm): Can we make this prettier? @@ -4639,7 +4741,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { } // Do actual delivery. - o.deliverMsg(dsubj, ackReply, pmsg, dc, rp, wrNoWait) + o.deliverMsg(dsubj, ackReply, pmsg, dc, rp) // If given request fulfilled batch size, but there are still pending bytes, send information about it. if wrn <= 0 && wrb > 0 { @@ -4838,7 +4940,7 @@ func convertToHeadersOnly(pmsg *jsPubMsg) { // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. -func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy, wrNoWait bool) { +func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) { if o.mset == nil { pmsg.returnToPool() return @@ -4871,15 +4973,10 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, } // Send message. - // If we're replicated we MUST only send the message AFTER we've got quorum for updating - // delivered state. Otherwise, we could be in an invalid state after a leader change. - // We can send immediately if not replicated, not using acks, or using flow control (incompatible). - // TODO(mvv): If NoWait we also bypass replicating first. - // Ideally we'd only send the NoWait request timeout after replication and delivery. - if o.node == nil || ap == AckNone || o.cfg.FlowControl || wrNoWait { - o.outq.send(pmsg) - } else { + if o.replicateDeliveries() { o.addReplicatedQueuedMsg(pmsg) + } else { + o.outq.send(pmsg) } // Flow control. @@ -4902,6 +4999,15 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, } } +// replicateDeliveries returns whether deliveries should be replicated before sending them. +// If we're replicated we MUST only send the message AFTER we've got quorum for updating +// delivered state. Otherwise, we could be in an invalid state after a leader change. +// We can send immediately if not replicated, not using acks, or using flow control (incompatible). +// Lock should be held. +func (o *consumer) replicateDeliveries() bool { + return o.node != nil && o.cfg.AckPolicy != AckNone && !o.cfg.FlowControl +} + func (o *consumer) needFlowControl(sz int) bool { if o.maxpb == 0 { return false @@ -6148,4 +6254,8 @@ func (o *consumer) resetPendingDeliveries() { pmsg.returnToPool() } o.pendingDeliveries = nil + for _, wd := range o.waitingDeliveries { + wd.recycle() + } + o.waitingDeliveries = nil } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/errors.json b/vendor/github.com/nats-io/nats-server/v2/server/errors.json index 7b90366a6e..3a80cc4d64 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/errors.json +++ b/vendor/github.com/nats-io/nats-server/v2/server/errors.json @@ -1658,5 +1658,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerPushWithPriorityGroupErr", + "code": 400, + "error_code": 10178, + "description": "priority groups can not be used with push consumers", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/vendor/github.com/nats-io/nats-server/v2/server/events.go b/vendor/github.com/nats-io/nats-server/v2/server/events.go index d579618bb1..3d44340b7b 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/events.go @@ -1401,10 +1401,9 @@ func (s *Server) initEventTracking() { } } - // User info. - // TODO(dlc) - Can be internal and not forwarded since bound server for the client connection - // is only one that will answer. This breaks tests since we still forward on remote server connect. - if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil { + // User info. Do not propagate interest so that we know the local server to the connection + // is the only one that will answer the requests. + if _, err := s.sysSubscribeInternal(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil { s.Errorf("Error setting up internal tracking: %v", err) return } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index 5a8b22cb61..1181b58ced 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -306,8 +306,8 @@ const ( consumerDir = "obs" // Index file for a consumer. consumerState = "o.dat" - // The suffix that will be given to a new temporary block during compression. - compressTmpSuffix = ".tmp" + // The suffix that will be given to a new temporary block for compression or when rewriting the full file. + blkTmpSuffix = ".tmp" // This is where we keep state on templates. tmplsDir = "templates" // Maximum size of a write buffer we may consider for re-use. @@ -651,7 +651,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { // Create or delete the THW if needed. if cfg.AllowMsgTTL && fs.ttls == nil { - fs.ttls = thw.NewHashWheel() + fs.recoverTTLState() } else if !cfg.AllowMsgTTL && fs.ttls != nil { fs.ttls = nil } @@ -1201,7 +1201,9 @@ func (fs *fileStore) rebuildStateLocked(ld *LostStreamData) { fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() } } - if lseq := atomic.LoadUint64(&mb.last.seq); lseq > fs.state.LastSeq { + // Preserve last time, could have erased the last message in one block, and then + // have a tombstone with the proper timestamp afterward in another block + if lseq := atomic.LoadUint64(&mb.last.seq); lseq >= fs.state.LastSeq { fs.state.LastSeq = lseq if mb.last.ts == 0 { fs.state.LastTime = time.Time{} @@ -1271,10 +1273,13 @@ func (mb *msgBlock) convertCipher() error { buf, _ := mb.loadBlock(nil) bek.XORKeyStream(buf, buf) - // Make sure we can parse with old cipher and key file. - if err = mb.indexCacheBuf(buf); err != nil { + // Check for compression, and make sure we can parse with old cipher and key file. + if nbuf, err := mb.decompressIfNeeded(buf); err != nil { + return err + } else if err = mb.indexCacheBuf(nbuf); err != nil { return err } + // Reset the cache since we just read everything in. mb.cache = nil @@ -1306,6 +1311,10 @@ func (mb *msgBlock) convertToEncrypted() error { if err != nil { return err } + // Check for compression. + if buf, err = mb.decompressIfNeeded(buf); err != nil { + return err + } if err := mb.indexCacheBuf(buf); err != nil { // This likely indicates this was already encrypted or corrupt. mb.cache = nil @@ -1376,15 +1385,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { firstNeedsSet := true // Check if we need to decrypt. - if mb.bek != nil && len(buf) > 0 { - // Recreate to reset counter. - mb.bek, err = genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) - if err != nil { - return nil, nil, err - } - mb.bek.XORKeyStream(buf, buf) + if err = mb.encryptOrDecryptIfNeeded(buf); err != nil { + return nil, nil, err } - // Check for compression. if buf, err = mb.decompressIfNeeded(buf); err != nil { return nil, nil, err @@ -1799,6 +1802,11 @@ func (fs *fileStore) recoverFullState() (rerr error) { } bi += n } + + // Pre-emptively mark block as closed, we'll confirm this block + // still exists on disk and report it as lost if not. + mb.closed = true + // Only add in if not empty or the lmb. if mb.msgs > 0 || i == lastIndex { fs.addMsgBlock(mb) @@ -1873,10 +1881,26 @@ func (fs *fileStore) recoverFullState() (rerr error) { if index > blkIndex { fs.warn("Stream state outdated, found extra blocks, will rebuild") return errPriorState + } else if mb, ok := fs.bim[index]; ok { + mb.closed = false } } } + var rebuild bool + for _, mb := range fs.blks { + if mb.closed { + rebuild = true + if ld, _, _ := mb.rebuildState(); ld != nil { + fs.addLostData(ld) + } + fs.warn("Stream state detected prior state, could not locate msg block %d", mb.index) + } + } + if rebuild { + return errPriorState + } + // We check first and last seq and number of msgs and bytes. If there is a difference, // return and error so we rebuild from the message block state on disk. if !trackingStatesEqual(&fs.state, &mstate) { @@ -1888,6 +1912,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { return nil } +// Lock should be held. func (fs *fileStore) recoverTTLState() error { // See if we have a timed hash wheel for TTLs. <-dios @@ -1917,31 +1942,34 @@ func (fs *fileStore) recoverTTLState() error { defer fs.resetAgeChk(0) if fs.state.Msgs > 0 && ttlseq <= fs.state.LastSeq { fs.warn("TTL state is outdated; attempting to recover using linear scan (seq %d to %d)", ttlseq, fs.state.LastSeq) - var sm StoreMsg - mb := fs.selectMsgBlock(ttlseq) - if mb == nil { - return nil - } - mblseq := atomic.LoadUint64(&mb.last.seq) + var ( + mb *msgBlock + sm StoreMsg + mblseq uint64 + ) for seq := ttlseq; seq <= fs.state.LastSeq; seq++ { retry: + if mb == nil { + if mb = fs.selectMsgBlock(seq); mb == nil { + // Selecting the message block should return a block that contains this sequence, + // or a later block if it can't be found. + // It's an error if we can't find any block within the bounds of first and last seq. + fs.warn("Error loading msg block with seq %d for recovering TTL: %s", seq) + continue + } + seq = atomic.LoadUint64(&mb.first.seq) + mblseq = atomic.LoadUint64(&mb.last.seq) + } if mb.ttls == 0 { // None of the messages in the block have message TTLs so don't // bother doing anything further with this block, skip to the end. seq = atomic.LoadUint64(&mb.last.seq) + 1 } if seq > mblseq { - // We've reached the end of the loaded block, see if we can continue - // by loading the next one. + // We've reached the end of the loaded block, so let's go back to the + // beginning and process the next block. mb.tryForceExpireCache() - if mb = fs.selectMsgBlock(seq); mb == nil { - // TODO(nat): Deal with gaps properly. Right now this will be - // probably expensive on CPU. - continue - } - mblseq = atomic.LoadUint64(&mb.last.seq) - // At this point we've loaded another block, so let's go back to the - // beginning and see if we need to skip this one too. + mb = nil goto retry } msg, _, err := mb.fetchMsgNoCopy(seq, &sm) @@ -1983,12 +2011,9 @@ func (mb *msgBlock) lastChecksum() []byte { } if mb.bek != nil { if buf, _ := mb.loadBlock(nil); len(buf) >= checksumSize { - bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) - if err != nil { + if err = mb.encryptOrDecryptIfNeeded(buf); err != nil { return nil } - mb.bek = bek - mb.bek.XORKeyStream(buf, buf) copy(lchk[0:], buf[len(buf)-checksumSize:]) } } else { @@ -2082,7 +2107,9 @@ func (fs *fileStore) recoverMsgs() error { fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() } } - if lseq := atomic.LoadUint64(&mb.last.seq); lseq > fs.state.LastSeq { + // Preserve last time, could have erased the last message in one block, and then + // have a tombstone with the proper timestamp afterward in another block + if lseq := atomic.LoadUint64(&mb.last.seq); lseq >= fs.state.LastSeq { fs.state.LastSeq = lseq if mb.last.ts == 0 { fs.state.LastTime = time.Time{} @@ -3935,16 +3962,27 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { var rbuf []byte if lmb := fs.lmb; lmb != nil { + lmb.mu.Lock() index = lmb.index + 1 + + // Flush any pending messages. + lmb.flushPendingMsgsLocked() // Determine if we can reclaim any resources here. - if fs.fip { - lmb.mu.Lock() - lmb.closeFDsLocked() - if lmb.cache != nil { - // Reset write timestamp and see if we can expire this cache. - rbuf = lmb.tryExpireWriteCache() - } - lmb.mu.Unlock() + lmb.closeFDsLockedNoCheck() + if lmb.cache != nil { + // Reset write timestamp and see if we can expire this cache. + rbuf = lmb.tryExpireWriteCache() + } + lmb.mu.Unlock() + + if fs.fcfg.Compression != NoCompression { + // We've now reached the end of this message block, if we want + // to compress blocks then now's the time to do it. + go func() { + lmb.mu.Lock() + defer lmb.mu.Unlock() + lmb.recompressOnDiskIfNeeded() + }() } } @@ -4208,20 +4246,19 @@ func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, // we will place an empty record marking the sequence as used. The // sequence will be marked erased. // fs lock should be held. -func (mb *msgBlock) skipMsg(seq uint64, now time.Time) { +func (mb *msgBlock) skipMsg(seq uint64, now int64) { if mb == nil { return } var needsRecord bool - nowts := ats.AccessTime() mb.mu.Lock() // If we are empty can just do meta. if mb.msgs == 0 { atomic.StoreUint64(&mb.last.seq, seq) - mb.last.ts = nowts + mb.last.ts = now atomic.StoreUint64(&mb.first.seq, seq+1) - mb.first.ts = nowts + mb.first.ts = 0 needsRecord = mb == mb.fs.lmb if needsRecord && mb.rbytes > 0 { // We want to make sure since we have no messages @@ -4240,7 +4277,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) { mb.mu.Unlock() if needsRecord { - mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, nowts, true) + mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, now, true) } else { mb.kickFlusher() } @@ -4258,18 +4295,18 @@ func (fs *fileStore) SkipMsg() uint64 { } // Grab time and last seq. - now, seq := time.Now(), fs.state.LastSeq+1 + now, seq := ats.AccessTime(), fs.state.LastSeq+1 // Write skip msg. mb.skipMsg(seq, now) // Update fs state. - fs.state.LastSeq, fs.state.LastTime = seq, now + fs.state.LastSeq, fs.state.LastTime = seq, time.Unix(0, now).UTC() if fs.state.Msgs == 0 { - fs.state.FirstSeq, fs.state.FirstTime = seq, now + fs.state.FirstSeq, fs.state.FirstTime = seq, time.Time{} } if seq == fs.state.FirstSeq { - fs.state.FirstSeq, fs.state.FirstTime = seq+1, now + fs.state.FirstSeq, fs.state.FirstTime = seq+1, time.Time{} } // Mark as dirty for stream state. fs.dirty++ @@ -4299,11 +4336,6 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { numDeletes += mb.dmap.Size() } if mb == nil || numDeletes > maxDeletes && mb.msgs > 0 || mb.msgs > 0 && mb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize { - if mb != nil && fs.fcfg.Compression != NoCompression { - // We've now reached the end of this message block, if we want - // to compress blocks then now's the time to do it. - go mb.recompressOnDiskIfNeeded() - } var err error if mb, err = fs.newMsgBlockForWrite(); err != nil { return err @@ -4311,17 +4343,16 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { } // Insert into dmap all entries and place last as marker. - now := time.Now() - nowts := now.UnixNano() + now := ats.AccessTime() lseq := seq + num - 1 mb.mu.Lock() // If we are empty update meta directly. if mb.msgs == 0 { atomic.StoreUint64(&mb.last.seq, lseq) - mb.last.ts = nowts + mb.last.ts = now atomic.StoreUint64(&mb.first.seq, lseq+1) - mb.first.ts = nowts + mb.first.ts = 0 } else { for ; seq <= lseq; seq++ { mb.dmap.Insert(seq) @@ -4330,13 +4361,13 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { mb.mu.Unlock() // Write out our placeholder. - mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true) + mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, now, true) // Now update FS accounting. // Update fs state. - fs.state.LastSeq, fs.state.LastTime = lseq, now + fs.state.LastSeq, fs.state.LastTime = lseq, time.Unix(0, now).UTC() if fs.state.Msgs == 0 { - fs.state.FirstSeq, fs.state.FirstTime = lseq+1, now + fs.state.FirstSeq, fs.state.FirstTime = lseq+1, time.Time{} } // Mark as dirty for stream state. @@ -4759,18 +4790,19 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( mb.removeSeqPerSubject(sm.subj, seq) fs.removePerSubject(sm.subj) - if secure { - // Grab record info. - ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq)) - if err := mb.eraseMsg(seq, int(ri), int(rl)); err != nil { - return false, err - } - } - fifo := seq == atomic.LoadUint64(&mb.first.seq) isLastBlock := mb == fs.lmb isEmpty := mb.msgs == 0 + // If erase but block is empty, we can simply remove the block later. + if secure && !isEmpty { + // Grab record info. + ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq)) + if err := mb.eraseMsg(seq, int(ri), int(rl), isLastBlock); err != nil { + return false, err + } + } + if fifo { mb.selectNextFirst() if !isEmpty { @@ -5150,7 +5182,7 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) { } // Lock should be held. -func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error { +func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int, isLastBlock bool) error { var le = binary.LittleEndian var hdr [msgHdrSize]byte @@ -5192,35 +5224,23 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error { // Disk if mb.cache.off+mb.cache.wp > ri { - <-dios - mfd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms) - dios <- struct{}{} - if err != nil { - return err - } - defer mfd.Close() - if _, err = mfd.WriteAt(nbytes, int64(ri)); err == nil { - mfd.Sync() - } - if err != nil { + if err := mb.atomicOverwriteFile(mb.cache.buf, !isLastBlock); err != nil { return err } } return nil } -// Truncate this message block to the storedMsg. -func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { - mb.mu.Lock() - defer mb.mu.Unlock() - +// Truncate this message block to the tseq and ts. +// Lock should be held. +func (mb *msgBlock) truncate(tseq uint64, ts int64) (nmsgs, nbytes uint64, err error) { // Make sure we are loaded to process messages etc. if err := mb.loadMsgsWithLock(); err != nil { return 0, 0, err } // Calculate new eof using slot info from our new last sm. - ri, rl, _, err := mb.slotInfo(int(sm.seq - mb.cache.fseq)) + ri, rl, _, err := mb.slotInfo(int(tseq - mb.cache.fseq)) if err != nil { return 0, 0, err } @@ -5232,7 +5252,7 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { checkDmap := mb.dmap.Size() > 0 var smv StoreMsg - for seq := atomic.LoadUint64(&mb.last.seq); seq > sm.seq; seq-- { + for seq := atomic.LoadUint64(&mb.last.seq); seq > tseq; seq-- { if checkDmap { if mb.dmap.Exists(seq) { // Delete and skip to next. @@ -5266,46 +5286,19 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { if err != nil { return 0, 0, fmt.Errorf("failed to load block from disk: %w", err) } - if mb.bek != nil && len(buf) > 0 { - bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) - if err != nil { - return 0, 0, err - } - mb.bek = bek - mb.bek.XORKeyStream(buf, buf) + if err = mb.encryptOrDecryptIfNeeded(buf); err != nil { + return 0, 0, err } - buf, err = mb.decompressIfNeeded(buf) - if err != nil { + if buf, err = mb.decompressIfNeeded(buf); err != nil { return 0, 0, fmt.Errorf("failed to decompress block: %w", err) } buf = buf[:eof] copy(mb.lchk[0:], buf[:len(buf)-checksumSize]) - buf, err = mb.cmp.Compress(buf) - if err != nil { - return 0, 0, fmt.Errorf("failed to recompress block: %w", err) + // We did decompress but don't recompress the truncated buffer here since we're the last block + // and would otherwise have compressed data and allow to write uncompressed data in the same block. + if err = mb.atomicOverwriteFile(buf, false); err != nil { + return 0, 0, err } - meta := &CompressionInfo{ - Algorithm: mb.cmp, - OriginalSize: uint64(eof), - } - buf = append(meta.MarshalMetadata(), buf...) - if mb.bek != nil && len(buf) > 0 { - bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) - if err != nil { - return 0, 0, err - } - mb.bek = bek - mb.bek.XORKeyStream(buf, buf) - } - n, err := mb.writeAt(buf, 0) - if err != nil { - return 0, 0, fmt.Errorf("failed to rewrite compressed block: %w", err) - } - if n != len(buf) { - return 0, 0, fmt.Errorf("short write (%d != %d)", n, len(buf)) - } - mb.mfd.Truncate(int64(len(buf))) - mb.mfd.Sync() } else if mb.mfd != nil { mb.mfd.Truncate(eof) mb.mfd.Sync() @@ -5318,8 +5311,8 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { } // Update our last msg. - atomic.StoreUint64(&mb.last.seq, sm.seq) - mb.last.ts = sm.ts + atomic.StoreUint64(&mb.last.seq, tseq) + mb.last.ts = ts // Clear our cache. mb.clearCacheAndOffset() @@ -5383,7 +5376,11 @@ func (fs *fileStore) selectNextFirst() { mb := fs.blks[0] mb.mu.RLock() fs.state.FirstSeq = atomic.LoadUint64(&mb.first.seq) - fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() + if mb.first.ts == 0 { + fs.state.FirstTime = time.Time{} + } else { + fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() + } mb.mu.RUnlock() } else { // Could not find anything, so treat like purge @@ -5649,7 +5646,7 @@ func (fs *fileStore) expireMsgs() { // if it was the last message of that particular subject that we just deleted. if sdmEnabled { if last, ok := fs.shouldProcessSdm(seq, sm.subj); ok { - sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0 + sdm := last && isSubjectDeleteMarker(sm.hdr) fs.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL) } } else { @@ -5682,7 +5679,7 @@ func (fs *fileStore) expireMsgs() { if ttlSdm == nil { ttlSdm = make(map[string][]SDMBySubj, 1) } - ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0}) + ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)}) } else { // Collect sequences to remove. Don't remove messages inline here, // as that releases the lock and THW is not thread-safe. @@ -6113,14 +6110,6 @@ func (fs *fileStore) checkLastBlock(rl uint64) (lmb *msgBlock, err error) { lmb = fs.lmb rbytes := lmb.blkSize() if lmb == nil || (rbytes > 0 && rbytes+rl > fs.fcfg.BlockSize) { - if lmb != nil { - lmb.flushPendingMsgs() - if fs.fcfg.Compression != NoCompression { - // We've now reached the end of this message block, if we want - // to compress blocks then now's the time to do it. - go lmb.recompressOnDiskIfNeeded() - } - } if lmb, err = fs.newMsgBlockForWrite(); err != nil { return nil, err } @@ -6173,13 +6162,9 @@ func (fs *fileStore) writeTombstoneNoFlush(seq uint64, ts int64) error { return lmb.writeTombstoneNoFlush(seq, ts) } +// Lock should be held. func (mb *msgBlock) recompressOnDiskIfNeeded() error { alg := mb.fs.fcfg.Compression - mb.mu.Lock() - defer mb.mu.Unlock() - - origFN := mb.mfn // The original message block on disk. - tmpFN := mb.mfn + compressTmpSuffix // The compressed block will be written here. // Open up the file block and read in the entire contents into memory. // One of two things will happen: @@ -6188,7 +6173,7 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // 2. The block will be uncompressed, in which case we will compress it // and then write it back out to disk, re-encrypting if necessary. <-dios - origBuf, err := os.ReadFile(origFN) + origBuf, err := os.ReadFile(mb.mfn) dios <- struct{}{} if err != nil { @@ -6200,13 +6185,8 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // compression can be as efficient as possible on the raw data, whereas // the encrypted ciphertext will not compress anywhere near as well. // The block encryption also covers the optional compression metadata. - if mb.bek != nil && len(origBuf) > 0 { - bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) - if err != nil { - return err - } - mb.bek = bek - mb.bek.XORKeyStream(origBuf, origBuf) + if err = mb.encryptOrDecryptIfNeeded(origBuf); err != nil { + return err } meta := &CompressionInfo{} @@ -6231,6 +6211,19 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { } } + return mb.atomicOverwriteFile(origBuf, true) +} + +// Lock should be held. +func (mb *msgBlock) atomicOverwriteFile(buf []byte, allowCompress bool) error { + if mb.mfd != nil { + mb.closeFDsLockedNoCheck() + defer mb.enableForWriting(mb.fs.fip) + } + + origFN := mb.mfn // The original message block on disk. + tmpFN := mb.mfn + blkTmpSuffix // The new block will be written here. + // Rather than modifying the existing block on disk (which is a dangerous // operation if something goes wrong), create a new temporary file. We will // write out the new block here and then swap the files around afterwards @@ -6249,41 +6242,37 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { return err } - // The original buffer at this point is uncompressed, so we will now compress - // it if needed. Note that if the selected algorithm is NoCompression, the - // Compress function will just return the input buffer unmodified. - cmpBuf, err := alg.Compress(origBuf) - if err != nil { - return errorCleanup(fmt.Errorf("failed to compress block: %w", err)) - } + alg := NoCompression + if calg := mb.fs.fcfg.Compression; calg != NoCompression && allowCompress { + alg = calg + // The original buffer at this point is uncompressed, so we will now compress + // it if needed. Note that if the selected algorithm is NoCompression, the + // Compress function will just return the input buffer unmodified. + if buf, err = alg.Compress(buf); err != nil { + return errorCleanup(fmt.Errorf("failed to compress block: %w", err)) + } - // We only need to write out the metadata header if compression is enabled. - // If we're trying to uncompress the file on disk at this point, don't bother - // writing metadata. - if alg != NoCompression { + // We only need to write out the metadata header if compression is enabled. + // If we're trying to uncompress the file on disk at this point, don't bother + // writing metadata. meta := &CompressionInfo{ Algorithm: alg, - OriginalSize: uint64(len(origBuf)), + OriginalSize: uint64(len(buf)), } - cmpBuf = append(meta.MarshalMetadata(), cmpBuf...) + buf = append(meta.MarshalMetadata(), buf...) } // Re-encrypt the block if necessary. - if mb.bek != nil && len(cmpBuf) > 0 { - bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) - if err != nil { - return errorCleanup(err) - } - mb.bek = bek - mb.bek.XORKeyStream(cmpBuf, cmpBuf) + if err = mb.encryptOrDecryptIfNeeded(buf); err != nil { + return errorCleanup(err) } // Write the new block data (which might be compressed or encrypted) to the // temporary file. - if n, err := tmpFD.Write(cmpBuf); err != nil { + if n, err := tmpFD.Write(buf); err != nil { return errorCleanup(fmt.Errorf("failed to write to temporary file: %w", err)) - } else if n != len(cmpBuf) { - return errorCleanup(fmt.Errorf("short write to temporary file (%d != %d)", n, len(cmpBuf))) + } else if n != len(buf) { + return errorCleanup(fmt.Errorf("short write to temporary file (%d != %d)", n, len(buf))) } if err := tmpFD.Sync(); err != nil { return errorCleanup(fmt.Errorf("failed to sync temporary file: %w", err)) @@ -6303,11 +6292,12 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { mb.cmp = alg // Also update rbytes - mb.rbytes = uint64(len(cmpBuf)) + mb.rbytes = uint64(len(buf)) return nil } +// Lock should be held. func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) { var meta CompressionInfo if n, err := meta.UnmarshalMetadata(buf); err != nil { @@ -6328,6 +6318,19 @@ func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) { } } +// Lock should be held. +func (mb *msgBlock) encryptOrDecryptIfNeeded(buf []byte) error { + if mb.bek != nil && len(buf) > 0 { + bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) + if err != nil { + return err + } + mb.bek = bek + mb.bek.XORKeyStream(buf, buf) + } + return nil +} + // Lock should be held. func (mb *msgBlock) ensureRawBytesLoaded() error { if mb.rbytes > 0 { @@ -6631,7 +6634,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // If we have a hole fill it. for dseq := mbFirstSeq + uint64(len(idx)); dseq < seq; dseq++ { idx = append(idx, dbit) - if dms == 0 { + if dms == 0 && dseq != 0 { mb.dmap.Insert(dseq) } } @@ -6645,7 +6648,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { } // Make sure our dmap has this entry if it was erased. - if erased && dms == 0 { + if erased && dms == 0 && seq != 0 { mb.dmap.Insert(seq) } @@ -7009,15 +7012,9 @@ checkCache: mb.clearCacheAndOffset() // Check if we need to decrypt. - if mb.bek != nil && len(buf) > 0 { - bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) - if err != nil { - return err - } - mb.bek = bek - mb.bek.XORKeyStream(buf, buf) + if err = mb.encryptOrDecryptIfNeeded(buf); err != nil { + return err } - // Check for compression. if buf, err = mb.decompressIfNeeded(buf); err != nil { return err @@ -8094,7 +8091,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint firstSeqNeedsUpdate = firstSeqNeedsUpdate || seq == fs.state.FirstSeq } else if seq == fs.state.FirstSeq { fs.state.FirstSeq = atomic.LoadUint64(&mb.first.seq) // new one. - fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() + if mb.first.ts == 0 { + fs.state.FirstTime = time.Time{} + } else { + fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() + } } } else { // Out of order delete. @@ -8143,7 +8144,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint return purged, err } } - // Flush any pending. If we change blocks the checkLastBlock() will flush any pending for us. + // Flush any pending. If we change blocks the newMsgBlockForWrite() will flush any pending for us. if lmb := fs.lmb; lmb != nil { lmb.flushPendingMsgs() } @@ -8435,8 +8436,17 @@ SKIP: smb.mu.Unlock() // Write any tombstones as needed. - for _, tomb := range tombs { - fs.writeTombstone(tomb.seq, tomb.ts) + // When writing multiple tombstones we will flush at the end. + if len(tombs) > 0 { + for _, tomb := range tombs { + if err := fs.writeTombstoneNoFlush(tomb.seq, tomb.ts); err != nil { + return purged, err + } + } + // Flush any pending. If we change blocks the newMsgBlockForWrite() will flush any pending for us. + if lmb := fs.lmb; lmb != nil { + lmb.flushPendingMsgs() + } } if deleted > 0 { @@ -8601,55 +8611,118 @@ func (fs *fileStore) Truncate(seq uint64) error { return ErrStoreSnapshotInProgress } - nlmb := fs.selectMsgBlock(seq) - if nlmb == nil { - fs.mu.Unlock() - return ErrInvalidSequence - } - lsm, _, _ := nlmb.fetchMsgNoCopy(seq, nil) - if lsm == nil { - fs.mu.Unlock() - return ErrInvalidSequence + var lsm *StoreMsg + smb := fs.selectMsgBlock(seq) + if smb != nil { + lsm, _, _ = smb.fetchMsgNoCopy(seq, nil) } - // Set lmb to nlmb and make sure writeable. - fs.lmb = nlmb - if err := nlmb.enableForWriting(fs.fip); err != nil { - fs.mu.Unlock() + // Reset last so new block doesn't contain truncated sequences/timestamps. + var lastTime int64 + if lsm != nil { + lastTime = lsm.ts + } else if smb != nil { + lastTime = smb.last.ts + } else { + lastTime = fs.state.LastTime.UnixNano() + } + fs.state.LastSeq = seq + fs.state.LastTime = time.Unix(0, lastTime).UTC() + + // Always create a new write block for any tombstones. + // We'll truncate the selected message block as the last step, so can't write tombstones to it. + // If we end up not needing to write tombstones, this block will be cleaned up at the end. + tmb, err := fs.newMsgBlockForWrite() + if err != nil { return err } - // Collect all tombstones, we want to put these back so we can survive - // a restore without index.db properly. - var tombs []msgId - tombs = append(tombs, nlmb.tombs()...) + + // If the selected block is not found or the message was deleted, we'll need to write a tombstone + // at the truncated sequence so we don't roll backward on our last sequence and timestamp. + if lsm == nil { + fs.writeTombstone(seq, lastTime) + } var purged, bytes uint64 - // Truncate our new last message block. - nmsgs, nbytes, err := nlmb.truncate(lsm) - if err != nil { - fs.mu.Unlock() - return fmt.Errorf("nlmb.truncate: %w", err) - } - // Account for the truncated msgs and bytes. - purged += nmsgs - bytes += nbytes - // Remove any left over msg blocks. - getLastMsgBlock := func() *msgBlock { return fs.blks[len(fs.blks)-1] } - for mb := getLastMsgBlock(); mb != nlmb; mb = getLastMsgBlock() { + getLastMsgBlock := func() *msgBlock { + // Start at one before last, tmb will be the last most of the time + // unless a new block gets added for tombstones. + for i := len(fs.blks) - 2; i >= 0; i-- { + if mb := fs.blks[i]; mb.index < tmb.index { + return mb + } + } + return nil + } + for mb := getLastMsgBlock(); mb != nil && mb != smb; mb = getLastMsgBlock() { mb.mu.Lock() - // We do this to load tombs. - tombs = append(tombs, mb.tombsLocked()...) purged += mb.msgs bytes += mb.bytes + + // We could have tombstones for messages before the truncated sequence. + // Need to store those for blocks we're about to remove. + if tombs := mb.tombsLocked(); len(tombs) > 0 { + // Temporarily unlock while we write tombstones. + mb.mu.Unlock() + for _, tomb := range tombs { + if tomb.seq < seq { + fs.writeTombstone(tomb.seq, tomb.ts) + } + } + mb.mu.Lock() + } fs.removeMsgBlock(mb) mb.mu.Unlock() } + hasWrittenTombstones := len(tmb.tombs()) > 0 + if smb != nil { + // Make sure writeable. + smb.mu.Lock() + if err := smb.enableForWriting(fs.fip); err != nil { + smb.mu.Unlock() + fs.mu.Unlock() + return err + } + + // Truncate our selected message block. + nmsgs, nbytes, err := smb.truncate(seq, lastTime) + if err != nil { + smb.mu.Unlock() + fs.mu.Unlock() + return fmt.Errorf("smb.truncate: %w", err) + } + // Account for the truncated msgs and bytes. + purged += nmsgs + bytes += nbytes + + // The selected message block is not the last anymore, need to close down resources. + if hasWrittenTombstones { + // Quit our loops. + if smb.qch != nil { + close(smb.qch) + smb.qch = nil + } + smb.closeFDsLockedNoCheck() + smb.recompressOnDiskIfNeeded() + } + smb.mu.Unlock() + } + + // If no tombstones were written, we can remove the block and + // purely rely on the selected block as the last block. + if !hasWrittenTombstones { + fs.lmb = smb + tmb.mu.Lock() + fs.removeMsgBlock(tmb) + tmb.mu.Unlock() + } + // Reset last. - fs.state.LastSeq = lsm.seq - fs.state.LastTime = time.Unix(0, lsm.ts).UTC() + fs.state.LastSeq = seq + fs.state.LastTime = time.Unix(0, lastTime).UTC() // Update msgs and bytes. if purged > fs.state.Msgs { purged = fs.state.Msgs @@ -8663,16 +8736,6 @@ func (fs *fileStore) Truncate(seq uint64) error { // Reset our subject lookup info. fs.resetGlobalPerSubjectInfo() - // Always create new write block. - fs.newMsgBlockForWrite() - - // Write any tombstones as needed. - for _, tomb := range tombs { - if tomb.seq <= lsm.seq { - fs.writeTombstone(tomb.seq, tomb.ts) - } - } - // Any existing state file no longer applicable. We will force write a new one // after we release the lock. os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go index d5e3c9f11c..c119da6333 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go @@ -40,15 +40,15 @@ import ( // JetStreamConfig determines this server's configuration. // MaxMemory and MaxStore are in bytes. type JetStreamConfig struct { - MaxMemory int64 `json:"max_memory"` - MaxStore int64 `json:"max_storage"` - StoreDir string `json:"store_dir,omitempty"` - SyncInterval time.Duration `json:"sync_interval,omitempty"` - SyncAlways bool `json:"sync_always,omitempty"` - Domain string `json:"domain,omitempty"` - CompressOK bool `json:"compress_ok,omitempty"` - UniqueTag string `json:"unique_tag,omitempty"` - Strict bool `json:"strict,omitempty"` + MaxMemory int64 `json:"max_memory"` // MaxMemory is the maximum size of memory type streams + MaxStore int64 `json:"max_storage"` // MaxStore is the maximum size of file store type streams + StoreDir string `json:"store_dir,omitempty"` // StoreDir is where storage files are stored + SyncInterval time.Duration `json:"sync_interval,omitempty"` // SyncInterval is how frequently we sync to disk in the background by calling fsync + SyncAlways bool `json:"sync_always,omitempty"` // SyncAlways indicates flushes are done after every write + Domain string `json:"domain,omitempty"` // Domain is the JetStream domain + CompressOK bool `json:"compress_ok,omitempty"` // CompressOK indicates if compression is supported + UniqueTag string `json:"unique_tag,omitempty"` // UniqueTag is the unique tag assigned to this instance + Strict bool `json:"strict,omitempty"` // Strict indicates if strict JSON parsing is performed } // Statistics about JetStream for this server. @@ -91,11 +91,12 @@ type JetStreamAccountStats struct { Tiers map[string]JetStreamTier `json:"tiers,omitempty"` // indexed by tier name } +// JetStreamAPIStats holds stats about the API usage for this server type JetStreamAPIStats struct { - Level int `json:"level"` - Total uint64 `json:"total"` - Errors uint64 `json:"errors"` - Inflight uint64 `json:"inflight,omitempty"` + Level int `json:"level"` // Level is the active API level this server implements + Total uint64 `json:"total"` // Total is the total API requests received since start + Errors uint64 `json:"errors"` // Errors is the total API requests that resulted in error responses + Inflight uint64 `json:"inflight,omitempty"` // Inflight are the number of API requests currently being served } // This is for internal accounting for JetStream for this server. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 84f3675dff..edb54368bd 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -1225,6 +1225,8 @@ func (js *jetStream) monitorCluster() { doSnapshot() return case <-rqch: + // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. + doSnapshot() return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. @@ -1280,10 +1282,10 @@ func (js *jetStream) monitorCluster() { } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() } - ce.ReturnToPool() } else { s.Warnf("Error applying JetStream cluster entries: %v", err) } + ce.ReturnToPool() } aq.recycle(&ces) @@ -2455,6 +2457,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ne, nb = n.Applied(ce.Index) ce.ReturnToPool() } else { + // Make sure to clean up. + ce.ReturnToPool() // Our stream was closed out from underneath of us, simply return here. if err == errStreamClosed || err == errCatchupStreamStopped || err == ErrServerNotRunning { aq.recycle(&ces) @@ -4862,13 +4866,14 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if n.NeedSnapshot() { doSnapshot(true) } - } else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { + continue + } + if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { var ne, nb uint64 // We can't guarantee writes are flushed while we're shutting down. Just rely on replay during recovery. if !js.isShuttingDown() { ne, nb = n.Applied(ce.Index) } - ce.ReturnToPool() // If we have at least min entries to compact, go ahead and snapshot/compact. if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { doSnapshot(false) @@ -4876,6 +4881,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } else if err != errConsumerClosed { s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name) } + ce.ReturnToPool() } aq.recycle(&ces) @@ -5058,8 +5064,20 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea o.ldt = time.Now() // Need to send message to the client, since we have quorum to do so now. if pmsg, ok := o.pendingDeliveries[sseq]; ok { + // Copy delivery subject and sequence first, as the send returns it to the pool and clears it. + dsubj, seq := pmsg.dsubj, pmsg.seq o.outq.send(pmsg) delete(o.pendingDeliveries, sseq) + + // Might need to send a request timeout after sending the last replicated delivery. + if wd, ok := o.waitingDeliveries[dsubj]; ok && wd.seq == seq { + if wd.pn > 0 || wd.pb > 0 { + hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wd.pn, JSPullRequestPendingBytes, wd.pb) + o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + } + wd.recycle() + delete(o.waitingDeliveries, dsubj) + } } o.mu.Unlock() if err != nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_errors_generated.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_errors_generated.go index 30ed884e0f..162ca4f027 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_errors_generated.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_errors_generated.go @@ -197,6 +197,9 @@ const ( // JSConsumerPushMaxWaitingErr consumer in push mode can not set max waiting JSConsumerPushMaxWaitingErr ErrorIdentifier = 10080 + // JSConsumerPushWithPriorityGroupErr priority groups can not be used with push consumers + JSConsumerPushWithPriorityGroupErr ErrorIdentifier = 10178 + // JSConsumerReplacementWithDifferentNameErr consumer replacement durable config not the same JSConsumerReplacementWithDifferentNameErr ErrorIdentifier = 10106 @@ -570,6 +573,7 @@ var ( JSConsumerPullRequiresAckErr: {Code: 400, ErrCode: 10084, Description: "consumer in pull mode requires explicit ack policy on workqueue stream"}, JSConsumerPullWithRateLimitErr: {Code: 400, ErrCode: 10086, Description: "consumer in pull mode can not have rate limit set"}, JSConsumerPushMaxWaitingErr: {Code: 400, ErrCode: 10080, Description: "consumer in push mode can not set max waiting"}, + JSConsumerPushWithPriorityGroupErr: {Code: 400, ErrCode: 10178, Description: "priority groups can not be used with push consumers"}, JSConsumerReplacementWithDifferentNameErr: {Code: 400, ErrCode: 10106, Description: "consumer replacement durable config not the same"}, JSConsumerReplicasExceedsStream: {Code: 400, ErrCode: 10126, Description: "consumer config replica count exceeds parent stream"}, JSConsumerReplicasShouldMatchStream: {Code: 400, ErrCode: 10134, Description: "consumer config replicas must match interest retention stream's replicas"}, @@ -1397,6 +1401,16 @@ func NewJSConsumerPushMaxWaitingError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerPushMaxWaitingErr] } +// NewJSConsumerPushWithPriorityGroupError creates a new JSConsumerPushWithPriorityGroupErr error: "priority groups can not be used with push consumers" +func NewJSConsumerPushWithPriorityGroupError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerPushWithPriorityGroupErr] +} + // NewJSConsumerReplacementWithDifferentNameError creates a new JSConsumerReplacementWithDifferentNameErr error: "consumer replacement durable config not the same" func NewJSConsumerReplacementWithDifferentNameError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jwt.go b/vendor/github.com/nats-io/nats-server/v2/server/jwt.go index e8da5213cc..04d7dc60a3 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jwt.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jwt.go @@ -70,11 +70,20 @@ func wipeSlice(buf []byte) { func validateTrustedOperators(o *Options) error { if len(o.TrustedOperators) == 0 { // if we have no operator, default sentinel shouldn't be set - if o.DefaultSentinel != "" { + if o.DefaultSentinel != _EMPTY_ { return fmt.Errorf("default sentinel requires operators and accounts") } return nil } + if o.DefaultSentinel != _EMPTY_ { + juc, err := jwt.DecodeUserClaims(o.DefaultSentinel) + if err != nil { + return fmt.Errorf("default sentinel JWT not valid") + } + if !juc.BearerToken { + return fmt.Errorf("default sentinel must be a bearer token") + } + } if o.AccountResolver == nil { return fmt.Errorf("operators require an account resolver to be configured") } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index 28b25f9780..133b6408f8 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "github.com/nats-io/nats-server/v2/server/ats" + "github.com/nats-io/nats-server/v2/server/avl" "github.com/nats-io/nats-server/v2/server/gsl" "github.com/nats-io/nats-server/v2/server/stree" @@ -71,6 +73,9 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) { } } + // Register with access time service. + ats.Register() + return ms, nil } @@ -86,7 +91,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { ms.cfg = *cfg // Create or delete the THW if needed. if cfg.AllowMsgTTL && ms.ttls == nil { - ms.ttls = thw.NewHashWheel() + ms.recoverTTLState() } else if !cfg.AllowMsgTTL && ms.ttls != nil { ms.ttls = nil } @@ -125,6 +130,30 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { return nil } +// Lock should be held. +func (ms *memStore) recoverTTLState() { + ms.ttls = thw.NewHashWheel() + if ms.state.Msgs == 0 { + return + } + + var ( + seq uint64 + smv StoreMsg + sm *StoreMsg + ) + defer ms.resetAgeChk(0) + for sm, seq, _ = ms.loadNextMsgLocked(fwcs, true, 0, &smv); sm != nil; sm, seq, _ = ms.loadNextMsgLocked(fwcs, true, seq+1, &smv) { + if len(sm.hdr) == 0 { + continue + } + if ttl, _ := getMessageTTL(sm.hdr); ttl > 0 { + expires := time.Duration(sm.ts) + (time.Second * time.Duration(ttl)) + ms.ttls.Add(seq, int64(expires)) + } + } +} + // Stores a raw message with expected sequence number and timestamp. // Lock should be held. func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, ttl int64) error { @@ -286,7 +315,7 @@ func (ms *memStore) StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, i // SkipMsg will use the next sequence number but not store anything. func (ms *memStore) SkipMsg() uint64 { // Grab time. - now := time.Now().UTC() + now := time.Unix(0, ats.AccessTime()).UTC() ms.mu.Lock() seq := ms.state.LastSeq + 1 @@ -294,7 +323,7 @@ func (ms *memStore) SkipMsg() uint64 { ms.state.LastTime = now if ms.state.Msgs == 0 { ms.state.FirstSeq = seq + 1 - ms.state.FirstTime = now + ms.state.FirstTime = time.Time{} } else { ms.dmap.Insert(seq) } @@ -305,7 +334,7 @@ func (ms *memStore) SkipMsg() uint64 { // Skip multiple msgs. func (ms *memStore) SkipMsgs(seq uint64, num uint64) error { // Grab time. - now := time.Now().UTC() + now := time.Unix(0, ats.AccessTime()).UTC() ms.mu.Lock() defer ms.mu.Unlock() @@ -322,7 +351,7 @@ func (ms *memStore) SkipMsgs(seq uint64, num uint64) error { ms.state.LastSeq = lseq ms.state.LastTime = now if ms.state.Msgs == 0 { - ms.state.FirstSeq, ms.state.FirstTime = lseq+1, now + ms.state.FirstSeq, ms.state.FirstTime = lseq+1, time.Time{} } else { for ; seq <= lseq; seq++ { ms.dmap.Insert(seq) @@ -1075,7 +1104,7 @@ func (ms *memStore) expireMsgs() { } if sdmEnabled { if last, ok := ms.shouldProcessSdm(seq, sm.subj); ok { - sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0 + sdm := last && isSubjectDeleteMarker(sm.hdr) ms.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL) } } else { @@ -1105,7 +1134,7 @@ func (ms *memStore) expireMsgs() { if ttlSdm == nil { ttlSdm = make(map[string][]SDMBySubj, 1) } - ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0}) + ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)}) return false } } else { @@ -1292,7 +1321,9 @@ func (ms *memStore) purge(fseq uint64) (uint64, error) { ms.state.FirstTime = time.Time{} ms.state.Bytes = 0 ms.state.Msgs = 0 - ms.msgs = make(map[uint64]*StoreMsg) + if ms.msgs != nil { + ms.msgs = make(map[uint64]*StoreMsg) + } ms.fss = stree.NewSubjectTree[SimpleState]() ms.dmap.Empty() ms.sdm.empty() @@ -1421,9 +1452,9 @@ func (ms *memStore) Truncate(seq uint64) error { ms.mu.Lock() lsm, ok := ms.msgs[seq] - if !ok { - ms.mu.Unlock() - return ErrInvalidSequence + lastTime := ms.state.LastTime + if ok && lsm != nil { + lastTime = time.Unix(0, lsm.ts).UTC() } for i := ms.state.LastSeq; i > seq; i-- { @@ -1438,8 +1469,8 @@ func (ms *memStore) Truncate(seq uint64) error { } } // Reset last. - ms.state.LastSeq = lsm.seq - ms.state.LastTime = time.Unix(0, lsm.ts).UTC() + ms.state.LastSeq = seq + ms.state.LastTime = lastTime // Update msgs and bytes. if purged > ms.state.Msgs { purged = ms.state.Msgs @@ -1585,7 +1616,11 @@ func (ms *memStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *S func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) { ms.mu.Lock() defer ms.mu.Unlock() + return ms.loadNextMsgLocked(filter, wc, start, smp) +} +// Lock should be held. +func (ms *memStore) loadNextMsgLocked(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) { if start < ms.state.FirstSeq { start = ms.state.FirstSeq } @@ -1927,15 +1962,24 @@ func (ms *memStore) Delete() error { } func (ms *memStore) Stop() error { - // These can't come back, so stop is same as Delete. - ms.Purge() ms.mu.Lock() + if ms.msgs == nil { + ms.mu.Unlock() + return nil + } if ms.ageChk != nil { ms.ageChk.Stop() ms.ageChk = nil } ms.msgs = nil ms.mu.Unlock() + + // These can't come back, so stop is same as Delete. + ms.Purge() + + // Unregister from the access time service. + ats.Unregister() + return nil } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go index 6a0f9c6ac8..b160d9d89a 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go @@ -1184,184 +1184,185 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { // Varz will output server information on the monitoring port at /varz. type Varz struct { - ID string `json:"server_id"` - Name string `json:"server_name"` - Version string `json:"version"` - Proto int `json:"proto"` - GitCommit string `json:"git_commit,omitempty"` - GoVersion string `json:"go"` - Host string `json:"host"` - Port int `json:"port"` - AuthRequired bool `json:"auth_required,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` - IP string `json:"ip,omitempty"` - ClientConnectURLs []string `json:"connect_urls,omitempty"` - WSConnectURLs []string `json:"ws_connect_urls,omitempty"` - MaxConn int `json:"max_connections"` - MaxSubs int `json:"max_subscriptions,omitempty"` - PingInterval time.Duration `json:"ping_interval"` - MaxPingsOut int `json:"ping_max"` - HTTPHost string `json:"http_host"` - HTTPPort int `json:"http_port"` - HTTPBasePath string `json:"http_base_path"` - HTTPSPort int `json:"https_port"` - AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int32 `json:"max_control_line"` - MaxPayload int `json:"max_payload"` - MaxPending int64 `json:"max_pending"` - Cluster ClusterOptsVarz `json:"cluster,omitempty"` - Gateway GatewayOptsVarz `json:"gateway,omitempty"` - LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"` - MQTT MQTTOptsVarz `json:"mqtt,omitempty"` - Websocket WebsocketOptsVarz `json:"websocket,omitempty"` - JetStream JetStreamVarz `json:"jetstream,omitempty"` - TLSTimeout float64 `json:"tls_timeout"` - WriteDeadline time.Duration `json:"write_deadline"` - Start time.Time `json:"start"` - Now time.Time `json:"now"` - Uptime string `json:"uptime"` - Mem int64 `json:"mem"` - Cores int `json:"cores"` - MaxProcs int `json:"gomaxprocs"` - MemLimit int64 `json:"gomemlimit,omitempty"` - CPU float64 `json:"cpu"` - Connections int `json:"connections"` - TotalConnections uint64 `json:"total_connections"` - Routes int `json:"routes"` - Remotes int `json:"remotes"` - Leafs int `json:"leafnodes"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - SlowConsumers int64 `json:"slow_consumers"` - Subscriptions uint32 `json:"subscriptions"` - HTTPReqStats map[string]uint64 `json:"http_req_stats"` - ConfigLoadTime time.Time `json:"config_load_time"` - ConfigDigest string `json:"config_digest"` - Tags jwt.TagList `json:"tags,omitempty"` - TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"` - TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"` - SystemAccount string `json:"system_account,omitempty"` - PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` - OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` - SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` + ID string `json:"server_id"` // ID is the unique server ID generated at start + Name string `json:"server_name"` // Name is the configured server name, equals ID when not set + Version string `json:"version"` // Version is the version of the running server + Proto int `json:"proto"` // Proto is the protocol version this server supports + GitCommit string `json:"git_commit,omitempty"` // GitCommit is the git repository commit hash that the build corresponds with + GoVersion string `json:"go"` // GoVersion is the version of Go used to build this binary + Host string `json:"host"` // Host is the hostname the server runs on + Port int `json:"port"` // Port is the port the server listens on for client connections + AuthRequired bool `json:"auth_required,omitempty"` // AuthRequired indicates if users are required to authenticate to join the server + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if connections must use TLS when connecting to this server + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full TLS verification will be performed + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if the OCSP protocol will be used to verify peers + IP string `json:"ip,omitempty"` // IP is the IP address the server listens on if set + ClientConnectURLs []string `json:"connect_urls,omitempty"` // ClientConnectURLs is the list of URLs NATS clients can use to connect to this server + WSConnectURLs []string `json:"ws_connect_urls,omitempty"` // WSConnectURLs is the list of URLs websocket clients can use to connect to this server + MaxConn int `json:"max_connections"` // MaxConn is the maximum amount of connections the server can accept + MaxSubs int `json:"max_subscriptions,omitempty"` // MaxSubs is the maximum amount of subscriptions the server can manage + PingInterval time.Duration `json:"ping_interval"` // PingInterval is the interval the server will send PING messages during periods of inactivity on a connection + MaxPingsOut int `json:"ping_max"` // MaxPingsOut is the number of unanswered PINGs after which the connection will be considered stale + HTTPHost string `json:"http_host"` // HTTPHost is the HTTP host monitoring connections are accepted on + HTTPPort int `json:"http_port"` // HTTPPort is the port monitoring connections are accepted on + HTTPBasePath string `json:"http_base_path"` // HTTPBasePath is the path prefix for access to monitor endpoints + HTTPSPort int `json:"https_port"` // HTTPSPort is the HTTPS host monitoring connections are accepted on` + AuthTimeout float64 `json:"auth_timeout"` // AuthTimeout is the amount of seconds connections have to complete authentication + MaxControlLine int32 `json:"max_control_line"` // MaxControlLine is the amount of bytes a signal control message may be + MaxPayload int `json:"max_payload"` // MaxPayload is the maximum amount of bytes a message may have as payload + MaxPending int64 `json:"max_pending"` // MaxPending is the maximum amount of unprocessed bytes a connection may have + Cluster ClusterOptsVarz `json:"cluster,omitempty"` // Cluster is the Cluster state + Gateway GatewayOptsVarz `json:"gateway,omitempty"` // Gateway is the Super Cluster state + LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"` // LeafNode is the Leafnode state + MQTT MQTTOptsVarz `json:"mqtt,omitempty"` // MQTT is the MQTT state + Websocket WebsocketOptsVarz `json:"websocket,omitempty"` // Websocket is the Websocket client state + JetStream JetStreamVarz `json:"jetstream,omitempty"` // JetStream is the JetStream state + TLSTimeout float64 `json:"tls_timeout"` // TLSTimeout is how long TLS operations have to complete + WriteDeadline time.Duration `json:"write_deadline"` // WriteDeadline is the maximum time writes to sockets have to complete + Start time.Time `json:"start"` // Start is time when the server was started + Now time.Time `json:"now"` // Now is the current time of the server + Uptime string `json:"uptime"` // Uptime is how long the server has been running + Mem int64 `json:"mem"` // Mem is the resident memory allocation + Cores int `json:"cores"` // Cores is the number of cores the process has access to + MaxProcs int `json:"gomaxprocs"` // MaxProcs is the configured GOMAXPROCS value + MemLimit int64 `json:"gomemlimit,omitempty"` // MemLimit is the configured GOMEMLIMIT value + CPU float64 `json:"cpu"` // CPU is the current total CPU usage + Connections int `json:"connections"` // Connections is the current connected connections + TotalConnections uint64 `json:"total_connections"` // TotalConnections is the total connections the server have ever handled + Routes int `json:"routes"` // Routes is the number of connected route servers + Remotes int `json:"remotes"` // Remotes is the configured route remote endpoints + Leafs int `json:"leafnodes"` // Leafs is the number connected leafnode clients + InMsgs int64 `json:"in_msgs"` // InMsgs is the number of messages this server received + OutMsgs int64 `json:"out_msgs"` // OutMsgs is the number of message this server sent + InBytes int64 `json:"in_bytes"` // InBytes is the number of bytes this server received + OutBytes int64 `json:"out_bytes"` // OutMsgs is the number of bytes this server sent + SlowConsumers int64 `json:"slow_consumers"` // SlowConsumers is the total count of clients that were disconnected since start due to being slow consumers + Subscriptions uint32 `json:"subscriptions"` // Subscriptions is the count of active subscriptions + HTTPReqStats map[string]uint64 `json:"http_req_stats"` // HTTPReqStats is the number of requests each HTTP endpoint received + ConfigLoadTime time.Time `json:"config_load_time"` // ConfigLoadTime is the time the configuration was loaded or reloaded + ConfigDigest string `json:"config_digest"` // ConfigDigest is a calculated hash of the current configuration + Tags jwt.TagList `json:"tags,omitempty"` // Tags are the tags assigned to the server in configuration + Metadata map[string]string `json:"metadata,omitempty"` // Metadata is the metadata assigned to the server in configuration + TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"` // TrustedOperatorsJwt is the JWTs for all trusted operators + TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"` // TrustedOperatorsClaim is the decoded claims for each trusted operator + SystemAccount string `json:"system_account,omitempty"` // SystemAccount is the name of the System account + PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` // PinnedAccountFail is how often user logon fails due to the issuer account not being pinned. + OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` // OCSPResponseCache is the state of the OCSP cache // OCSPResponseCache holds information about + SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` // SlowConsumersStats is statistics about all detected Slow Consumer } // JetStreamVarz contains basic runtime information about jetstream type JetStreamVarz struct { - Config *JetStreamConfig `json:"config,omitempty"` - Stats *JetStreamStats `json:"stats,omitempty"` - Meta *MetaClusterInfo `json:"meta,omitempty"` - Limits *JSLimitOpts `json:"limits,omitempty"` + Config *JetStreamConfig `json:"config,omitempty"` // Config is the active JetStream configuration + Stats *JetStreamStats `json:"stats,omitempty"` // Stats is the statistics for the JetStream server + Meta *MetaClusterInfo `json:"meta,omitempty"` // Meta is information about the JetStream metalayer + Limits *JSLimitOpts `json:"limits,omitempty"` // Limits are the configured JetStream limits } // ClusterOptsVarz contains monitoring cluster information type ClusterOptsVarz struct { - Name string `json:"name,omitempty"` - Host string `json:"addr,omitempty"` - Port int `json:"cluster_port,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - URLs []string `json:"urls,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - PoolSize int `json:"pool_size,omitempty"` + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections + Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size } // GatewayOptsVarz contains monitoring gateway information type GatewayOptsVarz struct { - Name string `json:"name,omitempty"` - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - Advertise string `json:"advertise,omitempty"` - ConnectRetries int `json:"connect_retries,omitempty"` - Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` - RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections + Port int `json:"port,omitempty"` // Port is the post gateway connections listens on + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients + ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make + Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes + RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected } // RemoteGatewayOptsVarz contains monitoring remote gateway information type RemoteGatewayOptsVarz struct { - Name string `json:"name"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - URLs []string `json:"urls,omitempty"` + Name string `json:"name"` // Name is the name of the remote gateway + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + URLs []string `json:"urls,omitempty"` // URLs is the list of Gateway URLs } // LeafNodeOptsVarz contains monitoring leaf node information type LeafNodeOptsVarz struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` + Host string `json:"host,omitempty"` // Host is the host the server listens on + Port int `json:"port,omitempty"` // Port is the port the server listens on + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time Leafnode connections have to complete authentication + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"` // Remotes is state of configured Leafnode remotes + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed } // DenyRules Contains lists of subjects not allowed to be imported/exported type DenyRules struct { - Exports []string `json:"exports,omitempty"` - Imports []string `json:"imports,omitempty"` + Exports []string `json:"exports,omitempty"` // Exports are denied exports + Imports []string `json:"imports,omitempty"` // Imports are denied imports } // RemoteLeafOptsVarz contains monitoring remote leaf node information type RemoteLeafOptsVarz struct { - LocalAccount string `json:"local_account,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - URLs []string `json:"urls,omitempty"` - Deny *DenyRules `json:"deny,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` + LocalAccount string `json:"local_account,omitempty"` // LocalAccount is the local account this leaf is logged into + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + URLs []string `json:"urls,omitempty"` // URLs is the list of URLs for the remote Leafnode connection + Deny *DenyRules `json:"deny,omitempty"` // Deny is the configured import and exports that the Leafnode may not access + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done } // MQTTOptsVarz contains monitoring MQTT information type MQTTOptsVarz struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - NoAuthUser string `json:"no_auth_user,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - TLSMap bool `json:"tls_map,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` - JsDomain string `json:"js_domain,omitempty"` - AckWait time.Duration `json:"ack_wait,omitempty"` - MaxAckPending uint16 `json:"max_ack_pending,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` + Host string `json:"host,omitempty"` // Host is the host the server listens on + Port int `json:"port,omitempty"` // Port is the port the server listens on + NoAuthUser string `json:"no_auth_user,omitempty"` // NoAuthUser is the user that will be used for unauthenticated connections + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is how long authentication has to complete + TLSMap bool `json:"tls_map,omitempty"` // TLSMap indicates if TLS Mapping is enabled + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` // TLSPinnedCerts is the list of certificates pinned to this connection + JsDomain string `json:"js_domain,omitempty"` // JsDomain is the JetStream domain used for MQTT state + AckWait time.Duration `json:"ack_wait,omitempty"` // AckWait is how long the internal JetStream state store will allow acks to complete + MaxAckPending uint16 `json:"max_ack_pending,omitempty"` // MaxAckPending is how many outstanding acks the internal JetStream state store will allow + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done } // WebsocketOptsVarz contains monitoring websocket information type WebsocketOptsVarz struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - Advertise string `json:"advertise,omitempty"` - NoAuthUser string `json:"no_auth_user,omitempty"` - JWTCookie string `json:"jwt_cookie,omitempty"` - HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - NoTLS bool `json:"no_tls,omitempty"` - TLSMap bool `json:"tls_map,omitempty"` - TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` - SameOrigin bool `json:"same_origin,omitempty"` - AllowedOrigins []string `json:"allowed_origins,omitempty"` - Compression bool `json:"compression,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` + Host string `json:"host,omitempty"` // Host is the host the server listens on + Port int `json:"port,omitempty"` // Port is the port the server listens on + Advertise string `json:"advertise,omitempty"` // Advertise is the connection URL the server advertises + NoAuthUser string `json:"no_auth_user,omitempty"` // NoAuthUser is the user that will be used for unauthenticated connections + JWTCookie string `json:"jwt_cookie,omitempty"` // JWTCookie is the name of a cookie the server will read for the connection JWT + HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"` // HandshakeTimeout is how long the connection has to complete the websocket setup + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is how long authentication has to complete + NoTLS bool `json:"no_tls,omitempty"` // NoTLS indicates if TLS is disabled + TLSMap bool `json:"tls_map,omitempty"` // TLSMap indicates if TLS Mapping is enabled + TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` // TLSPinnedCerts is the list of certificates pinned to this connection + SameOrigin bool `json:"same_origin,omitempty"` // SameOrigin indicates if same origin connections are allowed + AllowedOrigins []string `json:"allowed_origins,omitempty"` // AllowedOrigins list of configured trusted origins + Compression bool `json:"compression,omitempty"` // Compression indicates if compression is supported + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done } // OCSPResponseCacheVarz contains OCSP response cache information type OCSPResponseCacheVarz struct { - Type string `json:"cache_type,omitempty"` - Hits int64 `json:"cache_hits,omitempty"` - Misses int64 `json:"cache_misses,omitempty"` - Responses int64 `json:"cached_responses,omitempty"` - Revokes int64 `json:"cached_revoked_responses,omitempty"` - Goods int64 `json:"cached_good_responses,omitempty"` - Unknowns int64 `json:"cached_unknown_responses,omitempty"` + Type string `json:"cache_type,omitempty"` // Type is the kind of cache being used + Hits int64 `json:"cache_hits,omitempty"` // Hits is how many times the cache was able to answer a request + Misses int64 `json:"cache_misses,omitempty"` // Misses is how many times the cache failed to answer a request + Responses int64 `json:"cached_responses,omitempty"` // Responses is how many responses are currently stored in the cache + Revokes int64 `json:"cached_revoked_responses,omitempty"` // Revokes is how many of the stored cache entries are revokes + Goods int64 `json:"cached_good_responses,omitempty"` // Goods is how many of the stored cache entries are good responses + Unknowns int64 `json:"cached_unknown_responses,omitempty"` // Unknowns is how many of the stored cache entries are unknown responses } // VarzOptions are the options passed to Varz(). @@ -1370,10 +1371,10 @@ type VarzOptions struct{} // SlowConsumersStats contains information about the slow consumers from different type of connections. type SlowConsumersStats struct { - Clients uint64 `json:"clients"` - Routes uint64 `json:"routes"` - Gateways uint64 `json:"gateways"` - Leafs uint64 `json:"leafs"` + Clients uint64 `json:"clients"` // Clients is how many Clients were slow consumers + Routes uint64 `json:"routes"` // Routes is how many Routes were slow consumers + Gateways uint64 `json:"gateways"` // Gateways is how many Gateways were slow consumers + Leafs uint64 `json:"leafs"` // Leafs is how many Leafnodes were slow consumers } func myUptime(d time.Duration) string { @@ -1431,6 +1432,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { a.last { padding-bottom: 16px } a.version { font-size: 14; font-weight: 400; width: 312px; text-align: right; margin-top: -2rem } a.version:hover { color: rgb(22 22 32) } + .endpoint { font-size: 12px; color: #999; font-family: monospace; display: none } + a:hover .endpoint { display: inline } @@ -1441,33 +1444,33 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
- General - JetStream - Connections - Accounts - Account Stats - Subscriptions - Routes - LeafNodes - Gateways - Raft Groups - Health Probe + General %s + JetStream %s + Connections %s + Accounts %s + Account Stats %s + Subscriptions %s + Routes %s + LeafNodes %s + Gateways %s + Raft Groups %s + Health Probe %s Help `, srcUrl, VERSION, - s.basePath(VarzPath), - s.basePath(JszPath), - s.basePath(ConnzPath), - s.basePath(AccountzPath), - s.basePath(AccountStatzPath), - s.basePath(SubszPath), - s.basePath(RoutezPath), - s.basePath(LeafzPath), - s.basePath(GatewayzPath), - s.basePath(RaftzPath), - s.basePath(HealthzPath), + s.basePath(VarzPath), VarzPath, + s.basePath(JszPath), JszPath, + s.basePath(ConnzPath), ConnzPath, + s.basePath(AccountzPath), AccountzPath, + s.basePath(AccountStatzPath), AccountStatzPath, + s.basePath(SubszPath), SubszPath, + s.basePath(RoutezPath), RoutezPath, + s.basePath(LeafzPath), LeafzPath, + s.basePath(GatewayzPath), GatewayzPath, + s.basePath(RaftzPath), RaftzPath, + s.basePath(HealthzPath), HealthzPath, ) } @@ -2245,6 +2248,7 @@ type LeafzOptions struct { // LeafInfo has detailed information on each remote leafnode connection. type LeafInfo struct { + ID uint64 `json:"id"` Name string `json:"name"` IsSpoke bool `json:"is_spoke"` Account string `json:"account"` @@ -2287,6 +2291,7 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) { for _, ln := range lconns { ln.mu.Lock() lni := &LeafInfo{ + ID: ln.cid, Name: ln.leaf.remoteServer, IsSpoke: ln.isSpokeLeafNode(), Account: ln.acc.Name, @@ -2873,12 +2878,12 @@ type AccountDetail struct { // MetaClusterInfo shows information about the meta group. type MetaClusterInfo struct { - Name string `json:"name,omitempty"` - Leader string `json:"leader,omitempty"` - Peer string `json:"peer,omitempty"` - Replicas []*PeerInfo `json:"replicas,omitempty"` - Size int `json:"cluster_size"` - Pending int `json:"pending"` + Name string `json:"name,omitempty"` // Name is the name of the cluster + Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader + Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader + Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers + Size int `json:"cluster_size"` // Size is the known size of the cluster + Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed } // JSInfo has detailed information on JetStream. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/opts.go b/vendor/github.com/nats-io/nats-server/v2/server/opts.go index f6f7fc1863..07b207c0bc 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/opts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/opts.go @@ -247,11 +247,12 @@ type RemoteLeafOpts struct { JetStreamClusterMigrateDelay time.Duration `json:"jetstream_cluster_migrate_delay,omitempty"` } +// JSLimitOpts are active limits for the meta cluster type JSLimitOpts struct { - MaxRequestBatch int `json:"max_request_batch,omitempty"` - MaxAckPending int `json:"max_ack_pending,omitempty"` - MaxHAAssets int `json:"max_ha_assets,omitempty"` - Duplicates time.Duration `json:"max_duplicate_window,omitempty"` + MaxRequestBatch int `json:"max_request_batch,omitempty"` // MaxRequestBatch is the maximum amount of updates that can be sent in a batch + MaxAckPending int `json:"max_ack_pending,omitempty"` // MaxAckPending is the server limit for maximum amount of outstanding Acks + MaxHAAssets int `json:"max_ha_assets,omitempty"` // MaxHAAssets is the maximum of Streams and Consumers that may have more than 1 replica + Duplicates time.Duration `json:"max_duplicate_window,omitempty"` // Duplicates is the maximum value for duplicate tracking on Streams } type JSTpmOpts struct { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/vendor/github.com/nats-io/nats-server/v2/server/raft.go index 084c945634..e6e237547c 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -142,7 +142,7 @@ type raft struct { wal WAL // WAL store (filestore or memstore) wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage - track bool // + bytes uint64 // Total amount of bytes stored in the WAL. (Saves us from needing to call wal.FastState very often) werr error // Last write error state atomic.Int32 // RaftState @@ -164,34 +164,31 @@ type raft struct { llqrt time.Time // Last quorum lost time lsut time.Time // Last scale-up time - term uint64 // The current vote term - pterm uint64 // Previous term from the last snapshot - pindex uint64 // Previous index from the last snapshot - commit uint64 // Index of the most recent commit - applied uint64 // Index of the most recently applied commit + term uint64 // The current vote term + pterm uint64 // Previous term from the last snapshot + pindex uint64 // Previous index from the last snapshot + commit uint64 // Index of the most recent commit + applied uint64 // Index of the most recently applied commit + papplied uint64 // First sequence of our log, matches when we last installed a snapshot. aflr uint64 // Index when to signal initial messages have been applied after becoming leader. 0 means signaling is disabled. leader string // The ID of the leader vote string // Our current vote state - lxfer bool // Are we doing a leadership transfer? - - hcbehind bool // Were we falling behind at the last health check? (see: isCurrent) s *Server // Reference to top-level server c *client // Internal client for subscriptions js *jetStream // JetStream, if running, to see if we are out of resources - dflag bool // Debug flag - hasleader atomic.Bool // Is there a group leader right now? - pleader atomic.Bool // Has the group ever had a leader? - isSysAcc atomic.Bool // Are we utilizing the system account? - maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up. - - observer bool // The node is observing, i.e. not able to become leader + hasleader atomic.Bool // Is there a group leader right now? + pleader atomic.Bool // Has the group ever had a leader? + isSysAcc atomic.Bool // Are we utilizing the system account? extSt extensionState // Extension state + track bool // Whether out of resources checking is enabled. + dflag bool // Debug flag + psubj string // Proposals subject rpsubj string // Remove peers subject vsubj string // Vote requests subject @@ -208,9 +205,7 @@ type raft struct { catchup *catchupState // For when we need to catch up as a follower. progress map[string]*ipQueue[uint64] // For leader or server catching up a follower. - paused bool // Whether or not applies are paused - hcommit uint64 // The commit at the time that applies were paused - pobserver bool // Whether we were an observer at the time that applies were paused + hcommit uint64 // The commit at the time that applies were paused prop *ipQueue[*proposedEntry] // Proposals entry *ipQueue[*appendEntry] // Append entries @@ -220,6 +215,13 @@ type raft struct { votes *ipQueue[*voteResponse] // Vote responses leadc chan bool // Leader changes quit chan struct{} // Raft group shutdown + + lxfer bool // Are we doing a leadership transfer? + hcbehind bool // Were we falling behind at the last health check? (see: isCurrent) + maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up. + paused bool // Whether or not applies are paused + observer bool // The node is observing, i.e. not able to become leader + pobserver bool // Were we previously an observer? } type proposedEntry struct { @@ -439,6 +441,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel // Can't recover snapshots if memory based since wal will be reset. // We will inherit from the current leader. + n.papplied = 0 if _, ok := n.wal.(*memStore); ok { _ = os.RemoveAll(filepath.Join(n.sd, snapshotsDir)) } else { @@ -462,6 +465,8 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel // we will try to replay them and process them here. var state StreamState n.wal.FastState(&state) + n.bytes = state.Bytes + if state.Msgs > 0 { n.debug("Replaying state of %d entries", state.Msgs) if first, err := n.loadFirstEntry(); err == nil { @@ -1092,13 +1097,11 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) { // Calculate the number of entries and estimate the byte size that // we can now remove with a compaction/snapshot. - var state StreamState - n.wal.FastState(&state) - if n.applied > state.FirstSeq { - entries = n.applied - state.FirstSeq + if n.applied > n.papplied { + entries = n.applied - n.papplied } - if state.Msgs > 0 { - bytes = entries * state.Bytes / state.Msgs + if n.bytes > 0 { + bytes = entries * n.bytes / (n.pindex - n.papplied) } return entries, bytes } @@ -1184,7 +1187,7 @@ func (n *raft) InstallSnapshot(data []byte) error { return errNoSnapAvailable } - n.debug("Installing snapshot of %d bytes", len(data)) + n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied) return n.installSnapshot(&snapshot{ lastTerm: term, @@ -1218,6 +1221,10 @@ func (n *raft) installSnapshot(snap *snapshot) error { return err } + var state StreamState + n.wal.FastState(&state) + n.papplied = snap.lastIndex + n.bytes = state.Bytes return nil } @@ -1314,8 +1321,10 @@ func (n *raft) setupLastSnapshot() { // Compact the WAL when we're done if needed. n.pindex = snap.lastIndex n.pterm = snap.lastTerm + // Explicitly only set commit, and not applied. + // Applied will move up when the snapshot is actually applied. n.commit = snap.lastIndex - n.applied = snap.lastIndex + n.papplied = snap.lastIndex n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}})) if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) @@ -1696,12 +1705,12 @@ func (n *raft) Progress() (index, commit, applied uint64) { } // Size returns number of entries and total bytes for our WAL. -func (n *raft) Size() (uint64, uint64) { +func (n *raft) Size() (entries uint64, bytes uint64) { n.RLock() - var state StreamState - n.wal.FastState(&state) + entries = n.pindex - n.papplied + bytes = n.bytes n.RUnlock() - return state.Msgs, state.Bytes + return entries, bytes } func (n *raft) ID() string { @@ -1957,7 +1966,7 @@ func (n *raft) run() { n.apply.push(nil) runner: - for s.isRunning() { + for { switch n.State() { case Follower: n.runAsFollower() @@ -2691,7 +2700,7 @@ func (n *raft) loadFirstEntry() (ae *appendEntry, err error) { func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64]) { n.RLock() s, reply := n.s, n.areply - peer, subj, term, last := ar.peer, ar.reply, n.term, n.pindex + peer, subj, term, pterm, last := ar.peer, ar.reply, n.term, n.pterm, n.pindex n.RUnlock() defer s.grWG.Done() @@ -2713,7 +2722,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64 indexUpdatesQ.unregister() }() - n.debug("Running catchup for %q", peer) + n.debug("Running catchup for %q [%d:%d] to [%d:%d]", peer, ar.term, ar.index, pterm, last) const maxOutstanding = 2 * 1024 * 1024 // 2MB for now. next, total, om := uint64(0), 0, make(map[uint64]int) @@ -2919,9 +2928,7 @@ func (n *raft) applyCommit(index uint64) error { ae := n.pae[index] if ae == nil { - var state StreamState - n.wal.FastState(&state) - if index < state.FirstSeq { + if index < n.papplied { return nil } var err error @@ -3306,24 +3313,15 @@ func (n *raft) truncateWAL(term, index uint64) { if n.applied > n.commit { n.applied = n.commit } + if n.papplied > n.applied { + n.papplied = n.applied + } }() if err := n.wal.Truncate(index); err != nil { - // If we get an invalid sequence, reset our wal all together. - // We will not have holes, so this means we do not have this message stored anymore. - // This is normal when truncating back to applied/snapshot. - if err == ErrInvalidSequence { - n.debug("Clearing WAL") - n.wal.Truncate(0) - // If our index is non-zero use PurgeEx to set us to the correct next index. - if index > 0 { - n.wal.PurgeEx(fwcs, index+1, 0) - } - } else { - n.warn("Error truncating WAL: %v", err) - n.setWriteErrLocked(err) - return - } + n.warn("Error truncating WAL: %v", err) + n.setWriteErrLocked(err) + return } // Set after we know we have truncated properly. n.pterm, n.pindex = term, index @@ -3515,29 +3513,39 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if ae.pterm != n.pterm || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { - n.debug("AppendEntry detected pindex less than/equal to ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) + n.debug("AppendEntry detected pindex less than/equal to ours: [%d:%d] vs [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex) var ar *appendEntryResponse var success bool if ae.pindex < n.commit { // If we have already committed this entry, just mark success. success = true + n.debug("AppendEntry pindex %d below commit %d, marking success", ae.pindex, n.commit) } else if eae, _ := n.loadEntry(ae.pindex); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. if ae.pterm == n.pterm && !catchingUp { success = true + n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex, n.commit) } else if ae.pindex == n.pindex { // Check if only our terms do not match here. // Make sure pterms match and we take on the leader's. // This prevents constant spinning. n.truncateWAL(ae.pterm, ae.pindex) - } else if ae.pindex == n.applied { - // Entry can't be found, this is normal because we have a snapshot at this index. - // Truncate back to where we've created the snapshot. - n.truncateWAL(ae.pterm, ae.pindex) } else { - n.resetWAL() + snap, err := n.loadLastSnapshot() + if err == nil && snap.lastIndex == ae.pindex && snap.lastTerm == ae.pterm { + // Entry can't be found, this is normal because we have a snapshot at this index. + // Truncate back to where we've created the snapshot. + n.truncateWAL(snap.lastTerm, snap.lastIndex) + // Only continue if truncation was successful, and we ended up such that we can safely continue. + if ae.pterm == n.pterm && ae.pindex == n.pindex { + goto CONTINUE + } + } else { + // Otherwise, something has gone very wrong and we need to reset. + n.resetWAL() + } } } else if eae.term == ae.pterm { // If terms match we can delete all entries past this one, and then continue storing the current entry. @@ -3594,12 +3602,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.pterm = ae.pterm n.commit = ae.pindex - if _, err := n.wal.Compact(n.pindex + 1); err != nil { - n.setWriteErrLocked(err) - n.Unlock() - return - } - snap := &snapshot{ lastTerm: n.pterm, lastIndex: n.pindex, @@ -3620,7 +3622,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // Setup our state for catching up. - n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) + n.debug("AppendEntry did not match [%d:%d] with [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex) inbox := n.createCatchup(ae) ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) n.Unlock() @@ -3836,6 +3838,13 @@ func (n *raft) storeToWAL(ae *appendEntry) error { return errEntryStoreFailed } + var sz uint64 + if n.wtype == FileStorage { + sz = fileStoreMsgSize(_EMPTY_, nil, ae.buf) + } else { + sz = memStoreMsgSize(_EMPTY_, nil, ae.buf) + } + n.bytes += sz n.pterm = ae.term n.pindex = seq return nil @@ -4083,7 +4092,6 @@ func (n *raft) setWriteErrLocked(err error) { // Ignore non-write errors. if err == ErrStoreClosed || err == ErrStoreEOF || - err == ErrInvalidSequence || err == ErrStoreMsgNotFound || err == errNoPending || err == errPartialCache { @@ -4428,11 +4436,8 @@ func (n *raft) switchToLeader() { n.debug("Switching to leader") - var state StreamState - n.wal.FastState(&state) - // Check if we have items pending as we are taking over. - sendHB := state.LastSeq > n.commit + sendHB := n.pindex > n.commit n.lxfer = false n.updateLeader(n.id) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sdm.go b/vendor/github.com/nats-io/nats-server/v2/server/sdm.go index 7431479580..88a1be4e49 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sdm.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sdm.go @@ -13,7 +13,10 @@ package server -import "time" +import ( + "bytes" + "time" +) // SDMMeta holds pending/proposed data for subject delete markers or message removals. type SDMMeta struct { @@ -40,6 +43,12 @@ func newSDMMeta() *SDMMeta { } } +// isSubjectDeleteMarker returns whether the headers indicate this message is a subject delete marker. +// Either it's a usual marker with JSMarkerReason, or it's a KV Purge marker as the KVOperation. +func isSubjectDeleteMarker(hdr []byte) bool { + return len(sliceHeader(JSMarkerReason, hdr)) == 0 && !bytes.Equal(sliceHeader(KVOperation, hdr), KVOperationValuePurge) +} + // empty clears all data. func (sdm *SDMMeta) empty() { if sdm == nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/server.go b/vendor/github.com/nats-io/nats-server/v2/server/server.go index 671a16d092..cf539630fb 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/server.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/server.go @@ -2704,7 +2704,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // Setup state that can enable shutdown s.mu.Lock() hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) - l, e := natsListen("tcp", hp) + l, e := s.getServerListener(hp) s.listenerErr = e if e != nil { s.mu.Unlock() @@ -2760,6 +2760,18 @@ func (s *Server) AcceptLoop(clr chan struct{}) { clr = nil } +// getServerListener returns a network listener for the given host-port address. +// If the Server already has an active listener (s.listener), it returns that listener +// along with any previous error (s.listenerErr). Otherwise, it creates and returns +// a new TCP listener on the specified address using natsListen. +func (s *Server) getServerListener(hp string) (net.Listener, error) { + if s.listener != nil { + return s.listener, s.listenerErr + } + + return natsListen("tcp", hp) +} + // InProcessConn returns an in-process connection to the server, // avoiding the need to use a TCP listener for local connectivity // within the same process. This can be used regardless of the diff --git a/vendor/github.com/nats-io/nats-server/v2/server/store.go b/vendor/github.com/nats-io/nats-server/v2/server/store.go index 0774706c9f..271c02cbf4 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/store.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/store.go @@ -61,8 +61,6 @@ var ( ErrStoreWrongType = errors.New("wrong storage type") // ErrNoAckPolicy is returned when trying to update a consumer's acks with no ack policy. ErrNoAckPolicy = errors.New("ack policy is none") - // ErrInvalidSequence is returned when the sequence is not present in the stream store. - ErrInvalidSequence = errors.New("invalid sequence") // ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong. ErrSequenceMismatch = errors.New("expected sequence does not match store") // ErrCorruptStreamState diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index cf70ad9142..9029427fc5 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -223,12 +223,12 @@ type ClusterInfo struct { // PeerInfo shows information about all the peers in the cluster that // are supporting the stream or consumer. type PeerInfo struct { - Name string `json:"name"` - Current bool `json:"current"` - Offline bool `json:"offline,omitempty"` - Active time.Duration `json:"active"` - Lag uint64 `json:"lag,omitempty"` - Peer string `json:"peer"` + Name string `json:"name"` // Name is the unique name for the peer + Current bool `json:"current"` // Current indicates if it was seen recently and fully caught up + Offline bool `json:"offline,omitempty"` // Offline indicates if it has not been seen recently + Active time.Duration `json:"active"` // Active is the timestamp it was last active + Lag uint64 `json:"lag,omitempty"` // Lag is how many operations behind it is + Peer string `json:"peer"` // Peer is the unique ID for the peer // For migrations. cluster string } @@ -432,6 +432,12 @@ const ( JSMarkerReason = "Nats-Marker-Reason" ) +// Headers for published KV messages. +var ( + KVOperation = "KV-Operation" + KVOperationValuePurge = []byte("PURGE") +) + // Headers for republished messages and direct gets. const ( JSStream = "Nats-Stream" @@ -1873,7 +1879,7 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server, pedan // Check limits. We need some extra handling to allow updating MaxBytes. // First, let's calculate the difference between the new and old MaxBytes. - maxBytesDiff := cfg.MaxBytes - old.MaxBytes + maxBytesDiff := max(cfg.MaxBytes, 0) - max(old.MaxBytes, 0) if maxBytesDiff < 0 { // If we're updating to a lower MaxBytes (maxBytesDiff is negative), // then set to zero so checkBytesLimits doesn't set addBytes to 1. @@ -2205,7 +2211,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool, mset.mu.Unlock() if js != nil { - maxBytesDiff := cfg.MaxBytes - ocfg.MaxBytes + maxBytesDiff := max(cfg.MaxBytes, 0) - max(ocfg.MaxBytes, 0) if maxBytesDiff > 0 { // Reserve the difference js.reserveStreamResources(&StreamConfig{ diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sublist.go b/vendor/github.com/nats-io/nats-server/v2/server/sublist.go index 54e9323964..078445c988 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sublist.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sublist.go @@ -1433,6 +1433,12 @@ func tokenizeSubjectIntoSlice(tts []string, subject string) []string { return tts } +// SubjectMatchesFilter returns true if the subject matches the provided +// filter or false otherwise. +func SubjectMatchesFilter(subject, filter string) bool { + return subjectIsSubsetMatch(subject, filter) +} + // Calls into the function isSubsetMatch() func subjectIsSubsetMatch(subject, test string) bool { tsa := [32]string{} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/thw/thw.go b/vendor/github.com/nats-io/nats-server/v2/server/thw/thw.go index bc2b271e66..a74265ef01 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/thw/thw.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/thw/thw.go @@ -152,51 +152,49 @@ func (hw *HashWheel) Update(seq uint64, oldExpires int64, newExpires int64) erro // ExpireTasks processes all expired tasks using a callback, but only expires a task if the callback returns true. func (hw *HashWheel) ExpireTasks(callback func(seq uint64, expires int64) bool) { now := time.Now().UnixNano() + hw.expireTasks(now, callback) +} +func (hw *HashWheel) expireTasks(ts int64, callback func(seq uint64, expires int64) bool) { // Quick return if nothing is expired. - if hw.lowest > now { + if hw.lowest > ts { return } - // Start from the slot containing the lowest expiration. - startPos, exitPos := hw.getPosition(hw.lowest), hw.getPosition(now+tickDuration) - var updateLowest bool - - for offset := int64(0); ; offset++ { - pos := (startPos + offset) & wheelMask - if pos == exitPos { - if updateLowest { - hw.updateLowestExpires() + globalLowest := int64(math.MaxInt64) + for pos, s := range hw.wheel { + // Skip s if nothing to expire. + if s == nil || s.lowest > ts { + if s != nil && s.lowest < globalLowest { + globalLowest = s.lowest } - return - } - // Grab our slot. - slot := hw.wheel[pos] - if slot == nil || slot.lowest > now { continue } // Track new lowest while processing expirations - newLowest := int64(math.MaxInt64) - for seq, expires := range slot.entries { - if expires <= now && callback(seq, expires) { - delete(slot.entries, seq) + slotLowest := int64(math.MaxInt64) + for seq, expires := range s.entries { + if expires <= ts && callback(seq, expires) { + delete(s.entries, seq) hw.count-- - updateLowest = true continue } - if expires < newLowest { - newLowest = expires + if expires < slotLowest { + slotLowest = expires } } // Nil out if we are empty. - if len(slot.entries) == 0 { + if len(s.entries) == 0 { hw.wheel[pos] = nil } else { - slot.lowest = newLowest + s.lowest = slotLowest + if slotLowest < globalLowest { + globalLowest = slotLowest + } } } + hw.lowest = globalLowest } // GetNextExpiration returns the earliest expiration time before the given time. diff --git a/vendor/modules.txt b/vendor/modules.txt index 77046e89ef..ad3ac974fa 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -996,7 +996,7 @@ github.com/munnerz/goautoneg # github.com/nats-io/jwt/v2 v2.7.4 ## explicit; go 1.23.0 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.11.6 +# github.com/nats-io/nats-server/v2 v2.11.7 ## explicit; go 1.23.0 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/fastrand