diff --git a/go.mod b/go.mod
index 0ba51bf3cf..284dfadf6c 100644
--- a/go.mod
+++ b/go.mod
@@ -55,7 +55,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
- github.com/nats-io/nats-server/v2 v2.11.6
+ github.com/nats-io/nats-server/v2 v2.11.7
github.com/nats-io/nats.go v1.43.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.0.8
diff --git a/go.sum b/go.sum
index 28c0879525..5ef738ebf8 100644
--- a/go.sum
+++ b/go.sum
@@ -821,8 +821,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
-github.com/nats-io/nats-server/v2 v2.11.6 h1:4VXRjbTUFKEB+7UoaKL3F5Y83xC7MxPoIONOnGgpkHw=
-github.com/nats-io/nats-server/v2 v2.11.6/go.mod h1:2xoztlcb4lDL5Blh1/BiukkKELXvKQ5Vy29FPVRBUYs=
+github.com/nats-io/nats-server/v2 v2.11.7 h1:lINWQ/Hb3cnaoHmWTjj/7WppZnaSh9C/1cD//nHCbms=
+github.com/nats-io/nats-server/v2 v2.11.7/go.mod h1:DchDPVzAsAPqhqm7VLedX0L7hjnV/SYtlmsl9F8U53s=
github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug=
github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go
index caf3f91532..8accdb2742 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/client.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go
@@ -4360,7 +4360,7 @@ func sliceHeader(key string, hdr []byte) []byte {
if len(hdr) == 0 {
return nil
}
- index := bytes.Index(hdr, stringToBytes(key))
+ index := bytes.Index(hdr, stringToBytes(key+":"))
hdrLen := len(hdr)
// Check that we have enough characters, this will handle the -1 case of the key not
// being found and will also handle not having enough characters for trailing CRLF.
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go
index bd46d8a4e0..1896ba575b 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/const.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go
@@ -58,7 +58,7 @@ func init() {
const (
// VERSION is the current version for the server.
- VERSION = "2.11.6"
+ VERSION = "2.11.7"
// PROTO is the currently supported protocol.
// 0 was the original
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go
index 5450449584..d15b6c46c7 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go
@@ -437,7 +437,8 @@ type consumer struct {
rdqi avl.SequenceSet
rdc map[uint64]uint64
replies map[uint64]string
- pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum.
+ pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum.
+ waitingDeliveries map[string]*waitingDelivery // (Optional) request timeout messages that need to wait for replicated deliveries first.
maxdc uint64
waiting *waitQueue
cfg ConsumerConfig
@@ -819,6 +820,9 @@ func checkConsumerCfg(
}
if config.PriorityPolicy != PriorityNone {
+ if config.DeliverSubject != "" {
+ return NewJSConsumerPushWithPriorityGroupError()
+ }
if len(config.PriorityGroups) == 0 {
return NewJSConsumerPriorityPolicyWithoutGroupError()
}
@@ -1846,7 +1850,7 @@ func (o *consumer) deleteNotActive() {
} else {
// Pull mode.
elapsed := time.Since(o.waiting.last)
- if elapsed <= o.cfg.InactiveThreshold {
+ if elapsed < o.dthresh {
// These need to keep firing so reset but use delta.
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh - elapsed)
@@ -1866,6 +1870,43 @@ func (o *consumer) deleteNotActive() {
o.mu.Unlock()
return
}
+
+ // We now know we have no waiting requests, and our last request was long ago.
+ // However, based on AckWait the consumer could still be actively processing,
+ // even if we haven't been informed if there were no acks in the meantime.
+ // We must wait for the message that expires last and start counting down the
+ // inactive threshold from there.
+ now := time.Now().UnixNano()
+ l := len(o.cfg.BackOff)
+ var delay time.Duration
+ var ackWait time.Duration
+ for _, p := range o.pending {
+ if l == 0 {
+ ackWait = o.ackWait(0)
+ } else {
+ bi := int(o.rdc[p.Sequence])
+ if bi < 0 {
+ bi = 0
+ } else if bi >= l {
+ bi = l - 1
+ }
+ ackWait = o.ackWait(o.cfg.BackOff[bi])
+ }
+ if ts := p.Timestamp + ackWait.Nanoseconds() + o.dthresh.Nanoseconds(); ts > now {
+ delay = max(delay, time.Duration(ts-now))
+ }
+ }
+ // We'll wait for the latest time we expect an ack, plus the inactive threshold.
+ // Acknowledging a message will reset this back down to just the inactive threshold.
+ if delay > 0 {
+ if o.dtmr != nil {
+ o.dtmr.Reset(delay)
+ } else {
+ o.dtmr = time.AfterFunc(delay, o.deleteNotActive)
+ }
+ o.mu.Unlock()
+ return
+ }
}
s, js := o.mset.srv, o.srv.js.Load()
@@ -2540,11 +2581,23 @@ func (o *consumer) addAckReply(sseq uint64, reply string) {
// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum.
// Lock should be held.
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) {
- // Is not explicitly limited in size, but will at maximum hold maximum ack pending.
+ // Is not explicitly limited in size, but will at most hold maximum ack pending.
if o.pendingDeliveries == nil {
o.pendingDeliveries = make(map[uint64]*jsPubMsg)
}
o.pendingDeliveries[pmsg.seq] = pmsg
+
+ // Is not explicitly limited in size, but will at most hold maximum waiting requests.
+ if o.waitingDeliveries == nil {
+ o.waitingDeliveries = make(map[string]*waitingDelivery)
+ }
+ if wd, ok := o.waitingDeliveries[pmsg.dsubj]; ok {
+ wd.seq = pmsg.seq
+ } else {
+ wd := wdPool.Get().(*waitingDelivery)
+ wd.seq = pmsg.seq
+ o.waitingDeliveries[pmsg.dsubj] = wd
+ }
}
// Lock should be held.
@@ -3446,6 +3499,28 @@ func (wr *waitingRequest) recycle() {
}
}
+// Represents an (optional) request timeout that's sent after waiting for replicated deliveries.
+type waitingDelivery struct {
+ seq uint64
+ pn int // Pending messages.
+ pb int // Pending bytes.
+}
+
+// sync.Pool for waiting deliveries.
+var wdPool = sync.Pool{
+ New: func() any {
+ return new(waitingDelivery)
+ },
+}
+
+// Force a recycle.
+func (wd *waitingDelivery) recycle() {
+ if wd != nil {
+ wd.seq, wd.pn, wd.pb = 0, 0, 0
+ wdPool.Put(wd)
+ }
+}
+
// waiting queue for requests that are waiting for new messages to arrive.
type waitQueue struct {
n, max int
@@ -3721,8 +3796,19 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
} else {
// We do check for expiration in `processWaiting`, but it is possible to hit the expiry here, and not there.
- hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
- o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
+ rdWait := o.replicateDeliveries()
+ if rdWait {
+ // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately.
+ if wd, ok := o.waitingDeliveries[wr.reply]; !ok {
+ rdWait = false
+ } else {
+ wd.pn, wd.pb = wr.n, wr.b
+ }
+ }
+ if !rdWait {
+ hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
+ o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
+ }
o.waiting.removeCurrent()
if o.node != nil {
o.removeClusterPendingRequest(wr.reply)
@@ -4187,8 +4273,19 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
for wr := wq.head; wr != nil; {
// Check expiration.
if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
- hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
- o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
+ rdWait := o.replicateDeliveries()
+ if rdWait {
+ // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately.
+ if wd, ok := o.waitingDeliveries[wr.reply]; !ok {
+ rdWait = false
+ } else {
+ wd.pn, wd.pb = wr.n, wr.b
+ }
+ }
+ if !rdWait {
+ hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
+ o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
+ }
wr = remove(pre, wr)
continue
}
@@ -4368,16 +4465,19 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
for {
select {
case <-o.ackMsgs.ch:
+ // If we have an inactiveThreshold set, mark our activity.
+ // Do this before processing acks, otherwise we might race if there are no pending messages
+ // anymore and the inactivity threshold kicks in before we're able to mark activity.
+ if hasInactiveThresh {
+ o.suppressDeletion()
+ }
+
acks := o.ackMsgs.pop()
for _, ack := range acks {
o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg)
ack.returnToPool()
}
o.ackMsgs.recycle(&acks)
- // If we have an inactiveThreshold set, mark our activity.
- if hasInactiveThresh {
- o.suppressDeletion()
- }
case <-ticker.C:
o.checkAckFloor()
case <-qch:
@@ -4425,8 +4525,11 @@ func (o *consumer) suppressDeletion() {
// if dtmr is not nil we have started the countdown, simply reset to threshold.
o.dtmr.Reset(o.dthresh)
} else if o.isPullMode() && o.waiting != nil {
- // Pull mode always has timer running, just update last on waiting queue.
+ // Pull mode always has timer running, update last on waiting queue.
o.waiting.last = time.Now()
+ if o.dtmr != nil {
+ o.dtmr.Reset(o.dthresh)
+ }
}
}
@@ -4485,7 +4588,6 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
delay time.Duration
sz int
wrn, wrb int
- wrNoWait bool
)
o.mu.Lock()
@@ -4564,7 +4666,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if o.isPushMode() {
dsubj = o.dsubj
} else if wr := o.nextWaiting(sz); wr != nil {
- wrn, wrb, wrNoWait = wr.n, wr.b, wr.noWait
+ wrn, wrb = wr.n, wr.b
dsubj = wr.reply
if o.cfg.PriorityPolicy == PriorityPinnedClient {
// FIXME(jrm): Can we make this prettier?
@@ -4639,7 +4741,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
// Do actual delivery.
- o.deliverMsg(dsubj, ackReply, pmsg, dc, rp, wrNoWait)
+ o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)
// If given request fulfilled batch size, but there are still pending bytes, send information about it.
if wrn <= 0 && wrb > 0 {
@@ -4838,7 +4940,7 @@ func convertToHeadersOnly(pmsg *jsPubMsg) {
// Deliver a msg to the consumer.
// Lock should be held and o.mset validated to be non-nil.
-func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy, wrNoWait bool) {
+func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) {
if o.mset == nil {
pmsg.returnToPool()
return
@@ -4871,15 +4973,10 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
}
// Send message.
- // If we're replicated we MUST only send the message AFTER we've got quorum for updating
- // delivered state. Otherwise, we could be in an invalid state after a leader change.
- // We can send immediately if not replicated, not using acks, or using flow control (incompatible).
- // TODO(mvv): If NoWait we also bypass replicating first.
- // Ideally we'd only send the NoWait request timeout after replication and delivery.
- if o.node == nil || ap == AckNone || o.cfg.FlowControl || wrNoWait {
- o.outq.send(pmsg)
- } else {
+ if o.replicateDeliveries() {
o.addReplicatedQueuedMsg(pmsg)
+ } else {
+ o.outq.send(pmsg)
}
// Flow control.
@@ -4902,6 +4999,15 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
}
}
+// replicateDeliveries returns whether deliveries should be replicated before sending them.
+// If we're replicated we MUST only send the message AFTER we've got quorum for updating
+// delivered state. Otherwise, we could be in an invalid state after a leader change.
+// We can send immediately if not replicated, not using acks, or using flow control (incompatible).
+// Lock should be held.
+func (o *consumer) replicateDeliveries() bool {
+ return o.node != nil && o.cfg.AckPolicy != AckNone && !o.cfg.FlowControl
+}
+
func (o *consumer) needFlowControl(sz int) bool {
if o.maxpb == 0 {
return false
@@ -6148,4 +6254,8 @@ func (o *consumer) resetPendingDeliveries() {
pmsg.returnToPool()
}
o.pendingDeliveries = nil
+ for _, wd := range o.waitingDeliveries {
+ wd.recycle()
+ }
+ o.waitingDeliveries = nil
}
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/errors.json b/vendor/github.com/nats-io/nats-server/v2/server/errors.json
index 7b90366a6e..3a80cc4d64 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/errors.json
+++ b/vendor/github.com/nats-io/nats-server/v2/server/errors.json
@@ -1658,5 +1658,15 @@
"help": "",
"url": "",
"deprecates": ""
+ },
+ {
+ "constant": "JSConsumerPushWithPriorityGroupErr",
+ "code": 400,
+ "error_code": 10178,
+ "description": "priority groups can not be used with push consumers",
+ "comment": "",
+ "help": "",
+ "url": "",
+ "deprecates": ""
}
]
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/events.go b/vendor/github.com/nats-io/nats-server/v2/server/events.go
index d579618bb1..3d44340b7b 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/events.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/events.go
@@ -1401,10 +1401,9 @@ func (s *Server) initEventTracking() {
}
}
- // User info.
- // TODO(dlc) - Can be internal and not forwarded since bound server for the client connection
- // is only one that will answer. This breaks tests since we still forward on remote server connect.
- if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
+ // User info. Do not propagate interest so that we know the local server to the connection
+ // is the only one that will answer the requests.
+ if _, err := s.sysSubscribeInternal(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go
index 5a8b22cb61..1181b58ced 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go
@@ -306,8 +306,8 @@ const (
consumerDir = "obs"
// Index file for a consumer.
consumerState = "o.dat"
- // The suffix that will be given to a new temporary block during compression.
- compressTmpSuffix = ".tmp"
+ // The suffix that will be given to a new temporary block for compression or when rewriting the full file.
+ blkTmpSuffix = ".tmp"
// This is where we keep state on templates.
tmplsDir = "templates"
// Maximum size of a write buffer we may consider for re-use.
@@ -651,7 +651,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
// Create or delete the THW if needed.
if cfg.AllowMsgTTL && fs.ttls == nil {
- fs.ttls = thw.NewHashWheel()
+ fs.recoverTTLState()
} else if !cfg.AllowMsgTTL && fs.ttls != nil {
fs.ttls = nil
}
@@ -1201,7 +1201,9 @@ func (fs *fileStore) rebuildStateLocked(ld *LostStreamData) {
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
}
- if lseq := atomic.LoadUint64(&mb.last.seq); lseq > fs.state.LastSeq {
+ // Preserve last time, could have erased the last message in one block, and then
+ // have a tombstone with the proper timestamp afterward in another block
+ if lseq := atomic.LoadUint64(&mb.last.seq); lseq >= fs.state.LastSeq {
fs.state.LastSeq = lseq
if mb.last.ts == 0 {
fs.state.LastTime = time.Time{}
@@ -1271,10 +1273,13 @@ func (mb *msgBlock) convertCipher() error {
buf, _ := mb.loadBlock(nil)
bek.XORKeyStream(buf, buf)
- // Make sure we can parse with old cipher and key file.
- if err = mb.indexCacheBuf(buf); err != nil {
+ // Check for compression, and make sure we can parse with old cipher and key file.
+ if nbuf, err := mb.decompressIfNeeded(buf); err != nil {
+ return err
+ } else if err = mb.indexCacheBuf(nbuf); err != nil {
return err
}
+
// Reset the cache since we just read everything in.
mb.cache = nil
@@ -1306,6 +1311,10 @@ func (mb *msgBlock) convertToEncrypted() error {
if err != nil {
return err
}
+ // Check for compression.
+ if buf, err = mb.decompressIfNeeded(buf); err != nil {
+ return err
+ }
if err := mb.indexCacheBuf(buf); err != nil {
// This likely indicates this was already encrypted or corrupt.
mb.cache = nil
@@ -1376,15 +1385,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
firstNeedsSet := true
// Check if we need to decrypt.
- if mb.bek != nil && len(buf) > 0 {
- // Recreate to reset counter.
- mb.bek, err = genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
- if err != nil {
- return nil, nil, err
- }
- mb.bek.XORKeyStream(buf, buf)
+ if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
+ return nil, nil, err
}
-
// Check for compression.
if buf, err = mb.decompressIfNeeded(buf); err != nil {
return nil, nil, err
@@ -1799,6 +1802,11 @@ func (fs *fileStore) recoverFullState() (rerr error) {
}
bi += n
}
+
+ // Pre-emptively mark block as closed, we'll confirm this block
+ // still exists on disk and report it as lost if not.
+ mb.closed = true
+
// Only add in if not empty or the lmb.
if mb.msgs > 0 || i == lastIndex {
fs.addMsgBlock(mb)
@@ -1873,10 +1881,26 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if index > blkIndex {
fs.warn("Stream state outdated, found extra blocks, will rebuild")
return errPriorState
+ } else if mb, ok := fs.bim[index]; ok {
+ mb.closed = false
}
}
}
+ var rebuild bool
+ for _, mb := range fs.blks {
+ if mb.closed {
+ rebuild = true
+ if ld, _, _ := mb.rebuildState(); ld != nil {
+ fs.addLostData(ld)
+ }
+ fs.warn("Stream state detected prior state, could not locate msg block %d", mb.index)
+ }
+ }
+ if rebuild {
+ return errPriorState
+ }
+
// We check first and last seq and number of msgs and bytes. If there is a difference,
// return and error so we rebuild from the message block state on disk.
if !trackingStatesEqual(&fs.state, &mstate) {
@@ -1888,6 +1912,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return nil
}
+// Lock should be held.
func (fs *fileStore) recoverTTLState() error {
// See if we have a timed hash wheel for TTLs.
<-dios
@@ -1917,31 +1942,34 @@ func (fs *fileStore) recoverTTLState() error {
defer fs.resetAgeChk(0)
if fs.state.Msgs > 0 && ttlseq <= fs.state.LastSeq {
fs.warn("TTL state is outdated; attempting to recover using linear scan (seq %d to %d)", ttlseq, fs.state.LastSeq)
- var sm StoreMsg
- mb := fs.selectMsgBlock(ttlseq)
- if mb == nil {
- return nil
- }
- mblseq := atomic.LoadUint64(&mb.last.seq)
+ var (
+ mb *msgBlock
+ sm StoreMsg
+ mblseq uint64
+ )
for seq := ttlseq; seq <= fs.state.LastSeq; seq++ {
retry:
+ if mb == nil {
+ if mb = fs.selectMsgBlock(seq); mb == nil {
+ // Selecting the message block should return a block that contains this sequence,
+ // or a later block if it can't be found.
+ // It's an error if we can't find any block within the bounds of first and last seq.
+ fs.warn("Error loading msg block with seq %d for recovering TTL: %s", seq)
+ continue
+ }
+ seq = atomic.LoadUint64(&mb.first.seq)
+ mblseq = atomic.LoadUint64(&mb.last.seq)
+ }
if mb.ttls == 0 {
// None of the messages in the block have message TTLs so don't
// bother doing anything further with this block, skip to the end.
seq = atomic.LoadUint64(&mb.last.seq) + 1
}
if seq > mblseq {
- // We've reached the end of the loaded block, see if we can continue
- // by loading the next one.
+ // We've reached the end of the loaded block, so let's go back to the
+ // beginning and process the next block.
mb.tryForceExpireCache()
- if mb = fs.selectMsgBlock(seq); mb == nil {
- // TODO(nat): Deal with gaps properly. Right now this will be
- // probably expensive on CPU.
- continue
- }
- mblseq = atomic.LoadUint64(&mb.last.seq)
- // At this point we've loaded another block, so let's go back to the
- // beginning and see if we need to skip this one too.
+ mb = nil
goto retry
}
msg, _, err := mb.fetchMsgNoCopy(seq, &sm)
@@ -1983,12 +2011,9 @@ func (mb *msgBlock) lastChecksum() []byte {
}
if mb.bek != nil {
if buf, _ := mb.loadBlock(nil); len(buf) >= checksumSize {
- bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
- if err != nil {
+ if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
return nil
}
- mb.bek = bek
- mb.bek.XORKeyStream(buf, buf)
copy(lchk[0:], buf[len(buf)-checksumSize:])
}
} else {
@@ -2082,7 +2107,9 @@ func (fs *fileStore) recoverMsgs() error {
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
}
- if lseq := atomic.LoadUint64(&mb.last.seq); lseq > fs.state.LastSeq {
+ // Preserve last time, could have erased the last message in one block, and then
+ // have a tombstone with the proper timestamp afterward in another block
+ if lseq := atomic.LoadUint64(&mb.last.seq); lseq >= fs.state.LastSeq {
fs.state.LastSeq = lseq
if mb.last.ts == 0 {
fs.state.LastTime = time.Time{}
@@ -3935,16 +3962,27 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
var rbuf []byte
if lmb := fs.lmb; lmb != nil {
+ lmb.mu.Lock()
index = lmb.index + 1
+
+ // Flush any pending messages.
+ lmb.flushPendingMsgsLocked()
// Determine if we can reclaim any resources here.
- if fs.fip {
- lmb.mu.Lock()
- lmb.closeFDsLocked()
- if lmb.cache != nil {
- // Reset write timestamp and see if we can expire this cache.
- rbuf = lmb.tryExpireWriteCache()
- }
- lmb.mu.Unlock()
+ lmb.closeFDsLockedNoCheck()
+ if lmb.cache != nil {
+ // Reset write timestamp and see if we can expire this cache.
+ rbuf = lmb.tryExpireWriteCache()
+ }
+ lmb.mu.Unlock()
+
+ if fs.fcfg.Compression != NoCompression {
+ // We've now reached the end of this message block, if we want
+ // to compress blocks then now's the time to do it.
+ go func() {
+ lmb.mu.Lock()
+ defer lmb.mu.Unlock()
+ lmb.recompressOnDiskIfNeeded()
+ }()
}
}
@@ -4208,20 +4246,19 @@ func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64,
// we will place an empty record marking the sequence as used. The
// sequence will be marked erased.
// fs lock should be held.
-func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
+func (mb *msgBlock) skipMsg(seq uint64, now int64) {
if mb == nil {
return
}
var needsRecord bool
- nowts := ats.AccessTime()
mb.mu.Lock()
// If we are empty can just do meta.
if mb.msgs == 0 {
atomic.StoreUint64(&mb.last.seq, seq)
- mb.last.ts = nowts
+ mb.last.ts = now
atomic.StoreUint64(&mb.first.seq, seq+1)
- mb.first.ts = nowts
+ mb.first.ts = 0
needsRecord = mb == mb.fs.lmb
if needsRecord && mb.rbytes > 0 {
// We want to make sure since we have no messages
@@ -4240,7 +4277,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
mb.mu.Unlock()
if needsRecord {
- mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, nowts, true)
+ mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, now, true)
} else {
mb.kickFlusher()
}
@@ -4258,18 +4295,18 @@ func (fs *fileStore) SkipMsg() uint64 {
}
// Grab time and last seq.
- now, seq := time.Now(), fs.state.LastSeq+1
+ now, seq := ats.AccessTime(), fs.state.LastSeq+1
// Write skip msg.
mb.skipMsg(seq, now)
// Update fs state.
- fs.state.LastSeq, fs.state.LastTime = seq, now
+ fs.state.LastSeq, fs.state.LastTime = seq, time.Unix(0, now).UTC()
if fs.state.Msgs == 0 {
- fs.state.FirstSeq, fs.state.FirstTime = seq, now
+ fs.state.FirstSeq, fs.state.FirstTime = seq, time.Time{}
}
if seq == fs.state.FirstSeq {
- fs.state.FirstSeq, fs.state.FirstTime = seq+1, now
+ fs.state.FirstSeq, fs.state.FirstTime = seq+1, time.Time{}
}
// Mark as dirty for stream state.
fs.dirty++
@@ -4299,11 +4336,6 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
numDeletes += mb.dmap.Size()
}
if mb == nil || numDeletes > maxDeletes && mb.msgs > 0 || mb.msgs > 0 && mb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize {
- if mb != nil && fs.fcfg.Compression != NoCompression {
- // We've now reached the end of this message block, if we want
- // to compress blocks then now's the time to do it.
- go mb.recompressOnDiskIfNeeded()
- }
var err error
if mb, err = fs.newMsgBlockForWrite(); err != nil {
return err
@@ -4311,17 +4343,16 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
}
// Insert into dmap all entries and place last as marker.
- now := time.Now()
- nowts := now.UnixNano()
+ now := ats.AccessTime()
lseq := seq + num - 1
mb.mu.Lock()
// If we are empty update meta directly.
if mb.msgs == 0 {
atomic.StoreUint64(&mb.last.seq, lseq)
- mb.last.ts = nowts
+ mb.last.ts = now
atomic.StoreUint64(&mb.first.seq, lseq+1)
- mb.first.ts = nowts
+ mb.first.ts = 0
} else {
for ; seq <= lseq; seq++ {
mb.dmap.Insert(seq)
@@ -4330,13 +4361,13 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
mb.mu.Unlock()
// Write out our placeholder.
- mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)
+ mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, now, true)
// Now update FS accounting.
// Update fs state.
- fs.state.LastSeq, fs.state.LastTime = lseq, now
+ fs.state.LastSeq, fs.state.LastTime = lseq, time.Unix(0, now).UTC()
if fs.state.Msgs == 0 {
- fs.state.FirstSeq, fs.state.FirstTime = lseq+1, now
+ fs.state.FirstSeq, fs.state.FirstTime = lseq+1, time.Time{}
}
// Mark as dirty for stream state.
@@ -4759,18 +4790,19 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
- if secure {
- // Grab record info.
- ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq))
- if err := mb.eraseMsg(seq, int(ri), int(rl)); err != nil {
- return false, err
- }
- }
-
fifo := seq == atomic.LoadUint64(&mb.first.seq)
isLastBlock := mb == fs.lmb
isEmpty := mb.msgs == 0
+ // If erase but block is empty, we can simply remove the block later.
+ if secure && !isEmpty {
+ // Grab record info.
+ ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq))
+ if err := mb.eraseMsg(seq, int(ri), int(rl), isLastBlock); err != nil {
+ return false, err
+ }
+ }
+
if fifo {
mb.selectNextFirst()
if !isEmpty {
@@ -5150,7 +5182,7 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) {
}
// Lock should be held.
-func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error {
+func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int, isLastBlock bool) error {
var le = binary.LittleEndian
var hdr [msgHdrSize]byte
@@ -5192,35 +5224,23 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error {
// Disk
if mb.cache.off+mb.cache.wp > ri {
- <-dios
- mfd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
- dios <- struct{}{}
- if err != nil {
- return err
- }
- defer mfd.Close()
- if _, err = mfd.WriteAt(nbytes, int64(ri)); err == nil {
- mfd.Sync()
- }
- if err != nil {
+ if err := mb.atomicOverwriteFile(mb.cache.buf, !isLastBlock); err != nil {
return err
}
}
return nil
}
-// Truncate this message block to the storedMsg.
-func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
- mb.mu.Lock()
- defer mb.mu.Unlock()
-
+// Truncate this message block to the tseq and ts.
+// Lock should be held.
+func (mb *msgBlock) truncate(tseq uint64, ts int64) (nmsgs, nbytes uint64, err error) {
// Make sure we are loaded to process messages etc.
if err := mb.loadMsgsWithLock(); err != nil {
return 0, 0, err
}
// Calculate new eof using slot info from our new last sm.
- ri, rl, _, err := mb.slotInfo(int(sm.seq - mb.cache.fseq))
+ ri, rl, _, err := mb.slotInfo(int(tseq - mb.cache.fseq))
if err != nil {
return 0, 0, err
}
@@ -5232,7 +5252,7 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
checkDmap := mb.dmap.Size() > 0
var smv StoreMsg
- for seq := atomic.LoadUint64(&mb.last.seq); seq > sm.seq; seq-- {
+ for seq := atomic.LoadUint64(&mb.last.seq); seq > tseq; seq-- {
if checkDmap {
if mb.dmap.Exists(seq) {
// Delete and skip to next.
@@ -5266,46 +5286,19 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
if err != nil {
return 0, 0, fmt.Errorf("failed to load block from disk: %w", err)
}
- if mb.bek != nil && len(buf) > 0 {
- bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
- if err != nil {
- return 0, 0, err
- }
- mb.bek = bek
- mb.bek.XORKeyStream(buf, buf)
+ if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
+ return 0, 0, err
}
- buf, err = mb.decompressIfNeeded(buf)
- if err != nil {
+ if buf, err = mb.decompressIfNeeded(buf); err != nil {
return 0, 0, fmt.Errorf("failed to decompress block: %w", err)
}
buf = buf[:eof]
copy(mb.lchk[0:], buf[:len(buf)-checksumSize])
- buf, err = mb.cmp.Compress(buf)
- if err != nil {
- return 0, 0, fmt.Errorf("failed to recompress block: %w", err)
+ // We did decompress but don't recompress the truncated buffer here since we're the last block
+ // and would otherwise have compressed data and allow to write uncompressed data in the same block.
+ if err = mb.atomicOverwriteFile(buf, false); err != nil {
+ return 0, 0, err
}
- meta := &CompressionInfo{
- Algorithm: mb.cmp,
- OriginalSize: uint64(eof),
- }
- buf = append(meta.MarshalMetadata(), buf...)
- if mb.bek != nil && len(buf) > 0 {
- bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
- if err != nil {
- return 0, 0, err
- }
- mb.bek = bek
- mb.bek.XORKeyStream(buf, buf)
- }
- n, err := mb.writeAt(buf, 0)
- if err != nil {
- return 0, 0, fmt.Errorf("failed to rewrite compressed block: %w", err)
- }
- if n != len(buf) {
- return 0, 0, fmt.Errorf("short write (%d != %d)", n, len(buf))
- }
- mb.mfd.Truncate(int64(len(buf)))
- mb.mfd.Sync()
} else if mb.mfd != nil {
mb.mfd.Truncate(eof)
mb.mfd.Sync()
@@ -5318,8 +5311,8 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
}
// Update our last msg.
- atomic.StoreUint64(&mb.last.seq, sm.seq)
- mb.last.ts = sm.ts
+ atomic.StoreUint64(&mb.last.seq, tseq)
+ mb.last.ts = ts
// Clear our cache.
mb.clearCacheAndOffset()
@@ -5383,7 +5376,11 @@ func (fs *fileStore) selectNextFirst() {
mb := fs.blks[0]
mb.mu.RLock()
fs.state.FirstSeq = atomic.LoadUint64(&mb.first.seq)
- fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
+ if mb.first.ts == 0 {
+ fs.state.FirstTime = time.Time{}
+ } else {
+ fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
+ }
mb.mu.RUnlock()
} else {
// Could not find anything, so treat like purge
@@ -5649,7 +5646,7 @@ func (fs *fileStore) expireMsgs() {
// if it was the last message of that particular subject that we just deleted.
if sdmEnabled {
if last, ok := fs.shouldProcessSdm(seq, sm.subj); ok {
- sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0
+ sdm := last && isSubjectDeleteMarker(sm.hdr)
fs.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL)
}
} else {
@@ -5682,7 +5679,7 @@ func (fs *fileStore) expireMsgs() {
if ttlSdm == nil {
ttlSdm = make(map[string][]SDMBySubj, 1)
}
- ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0})
+ ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)})
} else {
// Collect sequences to remove. Don't remove messages inline here,
// as that releases the lock and THW is not thread-safe.
@@ -6113,14 +6110,6 @@ func (fs *fileStore) checkLastBlock(rl uint64) (lmb *msgBlock, err error) {
lmb = fs.lmb
rbytes := lmb.blkSize()
if lmb == nil || (rbytes > 0 && rbytes+rl > fs.fcfg.BlockSize) {
- if lmb != nil {
- lmb.flushPendingMsgs()
- if fs.fcfg.Compression != NoCompression {
- // We've now reached the end of this message block, if we want
- // to compress blocks then now's the time to do it.
- go lmb.recompressOnDiskIfNeeded()
- }
- }
if lmb, err = fs.newMsgBlockForWrite(); err != nil {
return nil, err
}
@@ -6173,13 +6162,9 @@ func (fs *fileStore) writeTombstoneNoFlush(seq uint64, ts int64) error {
return lmb.writeTombstoneNoFlush(seq, ts)
}
+// Lock should be held.
func (mb *msgBlock) recompressOnDiskIfNeeded() error {
alg := mb.fs.fcfg.Compression
- mb.mu.Lock()
- defer mb.mu.Unlock()
-
- origFN := mb.mfn // The original message block on disk.
- tmpFN := mb.mfn + compressTmpSuffix // The compressed block will be written here.
// Open up the file block and read in the entire contents into memory.
// One of two things will happen:
@@ -6188,7 +6173,7 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// 2. The block will be uncompressed, in which case we will compress it
// and then write it back out to disk, re-encrypting if necessary.
<-dios
- origBuf, err := os.ReadFile(origFN)
+ origBuf, err := os.ReadFile(mb.mfn)
dios <- struct{}{}
if err != nil {
@@ -6200,13 +6185,8 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// compression can be as efficient as possible on the raw data, whereas
// the encrypted ciphertext will not compress anywhere near as well.
// The block encryption also covers the optional compression metadata.
- if mb.bek != nil && len(origBuf) > 0 {
- bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
- if err != nil {
- return err
- }
- mb.bek = bek
- mb.bek.XORKeyStream(origBuf, origBuf)
+ if err = mb.encryptOrDecryptIfNeeded(origBuf); err != nil {
+ return err
}
meta := &CompressionInfo{}
@@ -6231,6 +6211,19 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
}
}
+ return mb.atomicOverwriteFile(origBuf, true)
+}
+
+// Lock should be held.
+func (mb *msgBlock) atomicOverwriteFile(buf []byte, allowCompress bool) error {
+ if mb.mfd != nil {
+ mb.closeFDsLockedNoCheck()
+ defer mb.enableForWriting(mb.fs.fip)
+ }
+
+ origFN := mb.mfn // The original message block on disk.
+ tmpFN := mb.mfn + blkTmpSuffix // The new block will be written here.
+
// Rather than modifying the existing block on disk (which is a dangerous
// operation if something goes wrong), create a new temporary file. We will
// write out the new block here and then swap the files around afterwards
@@ -6249,41 +6242,37 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
return err
}
- // The original buffer at this point is uncompressed, so we will now compress
- // it if needed. Note that if the selected algorithm is NoCompression, the
- // Compress function will just return the input buffer unmodified.
- cmpBuf, err := alg.Compress(origBuf)
- if err != nil {
- return errorCleanup(fmt.Errorf("failed to compress block: %w", err))
- }
+ alg := NoCompression
+ if calg := mb.fs.fcfg.Compression; calg != NoCompression && allowCompress {
+ alg = calg
+ // The original buffer at this point is uncompressed, so we will now compress
+ // it if needed. Note that if the selected algorithm is NoCompression, the
+ // Compress function will just return the input buffer unmodified.
+ if buf, err = alg.Compress(buf); err != nil {
+ return errorCleanup(fmt.Errorf("failed to compress block: %w", err))
+ }
- // We only need to write out the metadata header if compression is enabled.
- // If we're trying to uncompress the file on disk at this point, don't bother
- // writing metadata.
- if alg != NoCompression {
+ // We only need to write out the metadata header if compression is enabled.
+ // If we're trying to uncompress the file on disk at this point, don't bother
+ // writing metadata.
meta := &CompressionInfo{
Algorithm: alg,
- OriginalSize: uint64(len(origBuf)),
+ OriginalSize: uint64(len(buf)),
}
- cmpBuf = append(meta.MarshalMetadata(), cmpBuf...)
+ buf = append(meta.MarshalMetadata(), buf...)
}
// Re-encrypt the block if necessary.
- if mb.bek != nil && len(cmpBuf) > 0 {
- bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
- if err != nil {
- return errorCleanup(err)
- }
- mb.bek = bek
- mb.bek.XORKeyStream(cmpBuf, cmpBuf)
+ if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
+ return errorCleanup(err)
}
// Write the new block data (which might be compressed or encrypted) to the
// temporary file.
- if n, err := tmpFD.Write(cmpBuf); err != nil {
+ if n, err := tmpFD.Write(buf); err != nil {
return errorCleanup(fmt.Errorf("failed to write to temporary file: %w", err))
- } else if n != len(cmpBuf) {
- return errorCleanup(fmt.Errorf("short write to temporary file (%d != %d)", n, len(cmpBuf)))
+ } else if n != len(buf) {
+ return errorCleanup(fmt.Errorf("short write to temporary file (%d != %d)", n, len(buf)))
}
if err := tmpFD.Sync(); err != nil {
return errorCleanup(fmt.Errorf("failed to sync temporary file: %w", err))
@@ -6303,11 +6292,12 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
mb.cmp = alg
// Also update rbytes
- mb.rbytes = uint64(len(cmpBuf))
+ mb.rbytes = uint64(len(buf))
return nil
}
+// Lock should be held.
func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) {
var meta CompressionInfo
if n, err := meta.UnmarshalMetadata(buf); err != nil {
@@ -6328,6 +6318,19 @@ func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) {
}
}
+// Lock should be held.
+func (mb *msgBlock) encryptOrDecryptIfNeeded(buf []byte) error {
+ if mb.bek != nil && len(buf) > 0 {
+ bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
+ if err != nil {
+ return err
+ }
+ mb.bek = bek
+ mb.bek.XORKeyStream(buf, buf)
+ }
+ return nil
+}
+
// Lock should be held.
func (mb *msgBlock) ensureRawBytesLoaded() error {
if mb.rbytes > 0 {
@@ -6631,7 +6634,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// If we have a hole fill it.
for dseq := mbFirstSeq + uint64(len(idx)); dseq < seq; dseq++ {
idx = append(idx, dbit)
- if dms == 0 {
+ if dms == 0 && dseq != 0 {
mb.dmap.Insert(dseq)
}
}
@@ -6645,7 +6648,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
}
// Make sure our dmap has this entry if it was erased.
- if erased && dms == 0 {
+ if erased && dms == 0 && seq != 0 {
mb.dmap.Insert(seq)
}
@@ -7009,15 +7012,9 @@ checkCache:
mb.clearCacheAndOffset()
// Check if we need to decrypt.
- if mb.bek != nil && len(buf) > 0 {
- bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
- if err != nil {
- return err
- }
- mb.bek = bek
- mb.bek.XORKeyStream(buf, buf)
+ if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
+ return err
}
-
// Check for compression.
if buf, err = mb.decompressIfNeeded(buf); err != nil {
return err
@@ -8094,7 +8091,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
firstSeqNeedsUpdate = firstSeqNeedsUpdate || seq == fs.state.FirstSeq
} else if seq == fs.state.FirstSeq {
fs.state.FirstSeq = atomic.LoadUint64(&mb.first.seq) // new one.
- fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
+ if mb.first.ts == 0 {
+ fs.state.FirstTime = time.Time{}
+ } else {
+ fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
+ }
}
} else {
// Out of order delete.
@@ -8143,7 +8144,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
return purged, err
}
}
- // Flush any pending. If we change blocks the checkLastBlock() will flush any pending for us.
+ // Flush any pending. If we change blocks the newMsgBlockForWrite() will flush any pending for us.
if lmb := fs.lmb; lmb != nil {
lmb.flushPendingMsgs()
}
@@ -8435,8 +8436,17 @@ SKIP:
smb.mu.Unlock()
// Write any tombstones as needed.
- for _, tomb := range tombs {
- fs.writeTombstone(tomb.seq, tomb.ts)
+ // When writing multiple tombstones we will flush at the end.
+ if len(tombs) > 0 {
+ for _, tomb := range tombs {
+ if err := fs.writeTombstoneNoFlush(tomb.seq, tomb.ts); err != nil {
+ return purged, err
+ }
+ }
+ // Flush any pending. If we change blocks the newMsgBlockForWrite() will flush any pending for us.
+ if lmb := fs.lmb; lmb != nil {
+ lmb.flushPendingMsgs()
+ }
}
if deleted > 0 {
@@ -8601,55 +8611,118 @@ func (fs *fileStore) Truncate(seq uint64) error {
return ErrStoreSnapshotInProgress
}
- nlmb := fs.selectMsgBlock(seq)
- if nlmb == nil {
- fs.mu.Unlock()
- return ErrInvalidSequence
- }
- lsm, _, _ := nlmb.fetchMsgNoCopy(seq, nil)
- if lsm == nil {
- fs.mu.Unlock()
- return ErrInvalidSequence
+ var lsm *StoreMsg
+ smb := fs.selectMsgBlock(seq)
+ if smb != nil {
+ lsm, _, _ = smb.fetchMsgNoCopy(seq, nil)
}
- // Set lmb to nlmb and make sure writeable.
- fs.lmb = nlmb
- if err := nlmb.enableForWriting(fs.fip); err != nil {
- fs.mu.Unlock()
+ // Reset last so new block doesn't contain truncated sequences/timestamps.
+ var lastTime int64
+ if lsm != nil {
+ lastTime = lsm.ts
+ } else if smb != nil {
+ lastTime = smb.last.ts
+ } else {
+ lastTime = fs.state.LastTime.UnixNano()
+ }
+ fs.state.LastSeq = seq
+ fs.state.LastTime = time.Unix(0, lastTime).UTC()
+
+ // Always create a new write block for any tombstones.
+ // We'll truncate the selected message block as the last step, so can't write tombstones to it.
+ // If we end up not needing to write tombstones, this block will be cleaned up at the end.
+ tmb, err := fs.newMsgBlockForWrite()
+ if err != nil {
return err
}
- // Collect all tombstones, we want to put these back so we can survive
- // a restore without index.db properly.
- var tombs []msgId
- tombs = append(tombs, nlmb.tombs()...)
+
+ // If the selected block is not found or the message was deleted, we'll need to write a tombstone
+ // at the truncated sequence so we don't roll backward on our last sequence and timestamp.
+ if lsm == nil {
+ fs.writeTombstone(seq, lastTime)
+ }
var purged, bytes uint64
- // Truncate our new last message block.
- nmsgs, nbytes, err := nlmb.truncate(lsm)
- if err != nil {
- fs.mu.Unlock()
- return fmt.Errorf("nlmb.truncate: %w", err)
- }
- // Account for the truncated msgs and bytes.
- purged += nmsgs
- bytes += nbytes
-
// Remove any left over msg blocks.
- getLastMsgBlock := func() *msgBlock { return fs.blks[len(fs.blks)-1] }
- for mb := getLastMsgBlock(); mb != nlmb; mb = getLastMsgBlock() {
+ getLastMsgBlock := func() *msgBlock {
+ // Start at one before last, tmb will be the last most of the time
+ // unless a new block gets added for tombstones.
+ for i := len(fs.blks) - 2; i >= 0; i-- {
+ if mb := fs.blks[i]; mb.index < tmb.index {
+ return mb
+ }
+ }
+ return nil
+ }
+ for mb := getLastMsgBlock(); mb != nil && mb != smb; mb = getLastMsgBlock() {
mb.mu.Lock()
- // We do this to load tombs.
- tombs = append(tombs, mb.tombsLocked()...)
purged += mb.msgs
bytes += mb.bytes
+
+ // We could have tombstones for messages before the truncated sequence.
+ // Need to store those for blocks we're about to remove.
+ if tombs := mb.tombsLocked(); len(tombs) > 0 {
+ // Temporarily unlock while we write tombstones.
+ mb.mu.Unlock()
+ for _, tomb := range tombs {
+ if tomb.seq < seq {
+ fs.writeTombstone(tomb.seq, tomb.ts)
+ }
+ }
+ mb.mu.Lock()
+ }
fs.removeMsgBlock(mb)
mb.mu.Unlock()
}
+ hasWrittenTombstones := len(tmb.tombs()) > 0
+ if smb != nil {
+ // Make sure writeable.
+ smb.mu.Lock()
+ if err := smb.enableForWriting(fs.fip); err != nil {
+ smb.mu.Unlock()
+ fs.mu.Unlock()
+ return err
+ }
+
+ // Truncate our selected message block.
+ nmsgs, nbytes, err := smb.truncate(seq, lastTime)
+ if err != nil {
+ smb.mu.Unlock()
+ fs.mu.Unlock()
+ return fmt.Errorf("smb.truncate: %w", err)
+ }
+ // Account for the truncated msgs and bytes.
+ purged += nmsgs
+ bytes += nbytes
+
+ // The selected message block is not the last anymore, need to close down resources.
+ if hasWrittenTombstones {
+ // Quit our loops.
+ if smb.qch != nil {
+ close(smb.qch)
+ smb.qch = nil
+ }
+ smb.closeFDsLockedNoCheck()
+ smb.recompressOnDiskIfNeeded()
+ }
+ smb.mu.Unlock()
+ }
+
+ // If no tombstones were written, we can remove the block and
+ // purely rely on the selected block as the last block.
+ if !hasWrittenTombstones {
+ fs.lmb = smb
+ tmb.mu.Lock()
+ fs.removeMsgBlock(tmb)
+ tmb.mu.Unlock()
+ }
+
// Reset last.
- fs.state.LastSeq = lsm.seq
- fs.state.LastTime = time.Unix(0, lsm.ts).UTC()
+ fs.state.LastSeq = seq
+ fs.state.LastTime = time.Unix(0, lastTime).UTC()
// Update msgs and bytes.
if purged > fs.state.Msgs {
purged = fs.state.Msgs
@@ -8663,16 +8736,6 @@ func (fs *fileStore) Truncate(seq uint64) error {
// Reset our subject lookup info.
fs.resetGlobalPerSubjectInfo()
- // Always create new write block.
- fs.newMsgBlockForWrite()
-
- // Write any tombstones as needed.
- for _, tomb := range tombs {
- if tomb.seq <= lsm.seq {
- fs.writeTombstone(tomb.seq, tomb.ts)
- }
- }
-
// Any existing state file no longer applicable. We will force write a new one
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go
index d5e3c9f11c..c119da6333 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go
@@ -40,15 +40,15 @@ import (
// JetStreamConfig determines this server's configuration.
// MaxMemory and MaxStore are in bytes.
type JetStreamConfig struct {
- MaxMemory int64 `json:"max_memory"`
- MaxStore int64 `json:"max_storage"`
- StoreDir string `json:"store_dir,omitempty"`
- SyncInterval time.Duration `json:"sync_interval,omitempty"`
- SyncAlways bool `json:"sync_always,omitempty"`
- Domain string `json:"domain,omitempty"`
- CompressOK bool `json:"compress_ok,omitempty"`
- UniqueTag string `json:"unique_tag,omitempty"`
- Strict bool `json:"strict,omitempty"`
+ MaxMemory int64 `json:"max_memory"` // MaxMemory is the maximum size of memory type streams
+ MaxStore int64 `json:"max_storage"` // MaxStore is the maximum size of file store type streams
+ StoreDir string `json:"store_dir,omitempty"` // StoreDir is where storage files are stored
+ SyncInterval time.Duration `json:"sync_interval,omitempty"` // SyncInterval is how frequently we sync to disk in the background by calling fsync
+ SyncAlways bool `json:"sync_always,omitempty"` // SyncAlways indicates flushes are done after every write
+ Domain string `json:"domain,omitempty"` // Domain is the JetStream domain
+ CompressOK bool `json:"compress_ok,omitempty"` // CompressOK indicates if compression is supported
+ UniqueTag string `json:"unique_tag,omitempty"` // UniqueTag is the unique tag assigned to this instance
+ Strict bool `json:"strict,omitempty"` // Strict indicates if strict JSON parsing is performed
}
// Statistics about JetStream for this server.
@@ -91,11 +91,12 @@ type JetStreamAccountStats struct {
Tiers map[string]JetStreamTier `json:"tiers,omitempty"` // indexed by tier name
}
+// JetStreamAPIStats holds stats about the API usage for this server
type JetStreamAPIStats struct {
- Level int `json:"level"`
- Total uint64 `json:"total"`
- Errors uint64 `json:"errors"`
- Inflight uint64 `json:"inflight,omitempty"`
+ Level int `json:"level"` // Level is the active API level this server implements
+ Total uint64 `json:"total"` // Total is the total API requests received since start
+ Errors uint64 `json:"errors"` // Errors is the total API requests that resulted in error responses
+ Inflight uint64 `json:"inflight,omitempty"` // Inflight are the number of API requests currently being served
}
// This is for internal accounting for JetStream for this server.
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go
index 84f3675dff..edb54368bd 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go
@@ -1225,6 +1225,8 @@ func (js *jetStream) monitorCluster() {
doSnapshot()
return
case <-rqch:
+ // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer.
+ doSnapshot()
return
case <-qch:
// Clean signal from shutdown routine so do best effort attempt to snapshot meta layer.
@@ -1280,10 +1282,10 @@ func (js *jetStream) monitorCluster() {
} else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta {
doSnapshot()
}
- ce.ReturnToPool()
} else {
s.Warnf("Error applying JetStream cluster entries: %v", err)
}
+ ce.ReturnToPool()
}
aq.recycle(&ces)
@@ -2455,6 +2457,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
ne, nb = n.Applied(ce.Index)
ce.ReturnToPool()
} else {
+ // Make sure to clean up.
+ ce.ReturnToPool()
// Our stream was closed out from underneath of us, simply return here.
if err == errStreamClosed || err == errCatchupStreamStopped || err == ErrServerNotRunning {
aq.recycle(&ces)
@@ -4862,13 +4866,14 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if n.NeedSnapshot() {
doSnapshot(true)
}
- } else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
+ continue
+ }
+ if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
var ne, nb uint64
// We can't guarantee writes are flushed while we're shutting down. Just rely on replay during recovery.
if !js.isShuttingDown() {
ne, nb = n.Applied(ce.Index)
}
- ce.ReturnToPool()
// If we have at least min entries to compact, go ahead and snapshot/compact.
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
doSnapshot(false)
@@ -4876,6 +4881,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
} else if err != errConsumerClosed {
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
}
+ ce.ReturnToPool()
}
aq.recycle(&ces)
@@ -5058,8 +5064,20 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
o.ldt = time.Now()
// Need to send message to the client, since we have quorum to do so now.
if pmsg, ok := o.pendingDeliveries[sseq]; ok {
+ // Copy delivery subject and sequence first, as the send returns it to the pool and clears it.
+ dsubj, seq := pmsg.dsubj, pmsg.seq
o.outq.send(pmsg)
delete(o.pendingDeliveries, sseq)
+
+ // Might need to send a request timeout after sending the last replicated delivery.
+ if wd, ok := o.waitingDeliveries[dsubj]; ok && wd.seq == seq {
+ if wd.pn > 0 || wd.pb > 0 {
+ hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wd.pn, JSPullRequestPendingBytes, wd.pb)
+ o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
+ }
+ wd.recycle()
+ delete(o.waitingDeliveries, dsubj)
+ }
}
o.mu.Unlock()
if err != nil {
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_errors_generated.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_errors_generated.go
index 30ed884e0f..162ca4f027 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_errors_generated.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_errors_generated.go
@@ -197,6 +197,9 @@ const (
// JSConsumerPushMaxWaitingErr consumer in push mode can not set max waiting
JSConsumerPushMaxWaitingErr ErrorIdentifier = 10080
+ // JSConsumerPushWithPriorityGroupErr priority groups can not be used with push consumers
+ JSConsumerPushWithPriorityGroupErr ErrorIdentifier = 10178
+
// JSConsumerReplacementWithDifferentNameErr consumer replacement durable config not the same
JSConsumerReplacementWithDifferentNameErr ErrorIdentifier = 10106
@@ -570,6 +573,7 @@ var (
JSConsumerPullRequiresAckErr: {Code: 400, ErrCode: 10084, Description: "consumer in pull mode requires explicit ack policy on workqueue stream"},
JSConsumerPullWithRateLimitErr: {Code: 400, ErrCode: 10086, Description: "consumer in pull mode can not have rate limit set"},
JSConsumerPushMaxWaitingErr: {Code: 400, ErrCode: 10080, Description: "consumer in push mode can not set max waiting"},
+ JSConsumerPushWithPriorityGroupErr: {Code: 400, ErrCode: 10178, Description: "priority groups can not be used with push consumers"},
JSConsumerReplacementWithDifferentNameErr: {Code: 400, ErrCode: 10106, Description: "consumer replacement durable config not the same"},
JSConsumerReplicasExceedsStream: {Code: 400, ErrCode: 10126, Description: "consumer config replica count exceeds parent stream"},
JSConsumerReplicasShouldMatchStream: {Code: 400, ErrCode: 10134, Description: "consumer config replicas must match interest retention stream's replicas"},
@@ -1397,6 +1401,16 @@ func NewJSConsumerPushMaxWaitingError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSConsumerPushMaxWaitingErr]
}
+// NewJSConsumerPushWithPriorityGroupError creates a new JSConsumerPushWithPriorityGroupErr error: "priority groups can not be used with push consumers"
+func NewJSConsumerPushWithPriorityGroupError(opts ...ErrorOption) *ApiError {
+ eopts := parseOpts(opts)
+ if ae, ok := eopts.err.(*ApiError); ok {
+ return ae
+ }
+
+ return ApiErrors[JSConsumerPushWithPriorityGroupErr]
+}
+
// NewJSConsumerReplacementWithDifferentNameError creates a new JSConsumerReplacementWithDifferentNameErr error: "consumer replacement durable config not the same"
func NewJSConsumerReplacementWithDifferentNameError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jwt.go b/vendor/github.com/nats-io/nats-server/v2/server/jwt.go
index e8da5213cc..04d7dc60a3 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/jwt.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/jwt.go
@@ -70,11 +70,20 @@ func wipeSlice(buf []byte) {
func validateTrustedOperators(o *Options) error {
if len(o.TrustedOperators) == 0 {
// if we have no operator, default sentinel shouldn't be set
- if o.DefaultSentinel != "" {
+ if o.DefaultSentinel != _EMPTY_ {
return fmt.Errorf("default sentinel requires operators and accounts")
}
return nil
}
+ if o.DefaultSentinel != _EMPTY_ {
+ juc, err := jwt.DecodeUserClaims(o.DefaultSentinel)
+ if err != nil {
+ return fmt.Errorf("default sentinel JWT not valid")
+ }
+ if !juc.BearerToken {
+ return fmt.Errorf("default sentinel must be a bearer token")
+ }
+ }
if o.AccountResolver == nil {
return fmt.Errorf("operators require an account resolver to be configured")
}
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go
index 28b25f9780..133b6408f8 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go
@@ -23,6 +23,8 @@ import (
"sync"
"time"
+ "github.com/nats-io/nats-server/v2/server/ats"
+
"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nats-server/v2/server/gsl"
"github.com/nats-io/nats-server/v2/server/stree"
@@ -71,6 +73,9 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
}
}
+ // Register with access time service.
+ ats.Register()
+
return ms, nil
}
@@ -86,7 +91,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
ms.cfg = *cfg
// Create or delete the THW if needed.
if cfg.AllowMsgTTL && ms.ttls == nil {
- ms.ttls = thw.NewHashWheel()
+ ms.recoverTTLState()
} else if !cfg.AllowMsgTTL && ms.ttls != nil {
ms.ttls = nil
}
@@ -125,6 +130,30 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
return nil
}
+// Lock should be held.
+func (ms *memStore) recoverTTLState() {
+ ms.ttls = thw.NewHashWheel()
+ if ms.state.Msgs == 0 {
+ return
+ }
+
+ var (
+ seq uint64
+ smv StoreMsg
+ sm *StoreMsg
+ )
+ defer ms.resetAgeChk(0)
+ for sm, seq, _ = ms.loadNextMsgLocked(fwcs, true, 0, &smv); sm != nil; sm, seq, _ = ms.loadNextMsgLocked(fwcs, true, seq+1, &smv) {
+ if len(sm.hdr) == 0 {
+ continue
+ }
+ if ttl, _ := getMessageTTL(sm.hdr); ttl > 0 {
+ expires := time.Duration(sm.ts) + (time.Second * time.Duration(ttl))
+ ms.ttls.Add(seq, int64(expires))
+ }
+ }
+}
+
// Stores a raw message with expected sequence number and timestamp.
// Lock should be held.
func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, ttl int64) error {
@@ -286,7 +315,7 @@ func (ms *memStore) StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, i
// SkipMsg will use the next sequence number but not store anything.
func (ms *memStore) SkipMsg() uint64 {
// Grab time.
- now := time.Now().UTC()
+ now := time.Unix(0, ats.AccessTime()).UTC()
ms.mu.Lock()
seq := ms.state.LastSeq + 1
@@ -294,7 +323,7 @@ func (ms *memStore) SkipMsg() uint64 {
ms.state.LastTime = now
if ms.state.Msgs == 0 {
ms.state.FirstSeq = seq + 1
- ms.state.FirstTime = now
+ ms.state.FirstTime = time.Time{}
} else {
ms.dmap.Insert(seq)
}
@@ -305,7 +334,7 @@ func (ms *memStore) SkipMsg() uint64 {
// Skip multiple msgs.
func (ms *memStore) SkipMsgs(seq uint64, num uint64) error {
// Grab time.
- now := time.Now().UTC()
+ now := time.Unix(0, ats.AccessTime()).UTC()
ms.mu.Lock()
defer ms.mu.Unlock()
@@ -322,7 +351,7 @@ func (ms *memStore) SkipMsgs(seq uint64, num uint64) error {
ms.state.LastSeq = lseq
ms.state.LastTime = now
if ms.state.Msgs == 0 {
- ms.state.FirstSeq, ms.state.FirstTime = lseq+1, now
+ ms.state.FirstSeq, ms.state.FirstTime = lseq+1, time.Time{}
} else {
for ; seq <= lseq; seq++ {
ms.dmap.Insert(seq)
@@ -1075,7 +1104,7 @@ func (ms *memStore) expireMsgs() {
}
if sdmEnabled {
if last, ok := ms.shouldProcessSdm(seq, sm.subj); ok {
- sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0
+ sdm := last && isSubjectDeleteMarker(sm.hdr)
ms.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL)
}
} else {
@@ -1105,7 +1134,7 @@ func (ms *memStore) expireMsgs() {
if ttlSdm == nil {
ttlSdm = make(map[string][]SDMBySubj, 1)
}
- ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0})
+ ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)})
return false
}
} else {
@@ -1292,7 +1321,9 @@ func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.state.FirstTime = time.Time{}
ms.state.Bytes = 0
ms.state.Msgs = 0
- ms.msgs = make(map[uint64]*StoreMsg)
+ if ms.msgs != nil {
+ ms.msgs = make(map[uint64]*StoreMsg)
+ }
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.dmap.Empty()
ms.sdm.empty()
@@ -1421,9 +1452,9 @@ func (ms *memStore) Truncate(seq uint64) error {
ms.mu.Lock()
lsm, ok := ms.msgs[seq]
- if !ok {
- ms.mu.Unlock()
- return ErrInvalidSequence
+ lastTime := ms.state.LastTime
+ if ok && lsm != nil {
+ lastTime = time.Unix(0, lsm.ts).UTC()
}
for i := ms.state.LastSeq; i > seq; i-- {
@@ -1438,8 +1469,8 @@ func (ms *memStore) Truncate(seq uint64) error {
}
}
// Reset last.
- ms.state.LastSeq = lsm.seq
- ms.state.LastTime = time.Unix(0, lsm.ts).UTC()
+ ms.state.LastSeq = seq
+ ms.state.LastTime = lastTime
// Update msgs and bytes.
if purged > ms.state.Msgs {
purged = ms.state.Msgs
@@ -1585,7 +1616,11 @@ func (ms *memStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *S
func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
+ return ms.loadNextMsgLocked(filter, wc, start, smp)
+}
+// Lock should be held.
+func (ms *memStore) loadNextMsgLocked(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) {
if start < ms.state.FirstSeq {
start = ms.state.FirstSeq
}
@@ -1927,15 +1962,24 @@ func (ms *memStore) Delete() error {
}
func (ms *memStore) Stop() error {
- // These can't come back, so stop is same as Delete.
- ms.Purge()
ms.mu.Lock()
+ if ms.msgs == nil {
+ ms.mu.Unlock()
+ return nil
+ }
if ms.ageChk != nil {
ms.ageChk.Stop()
ms.ageChk = nil
}
ms.msgs = nil
ms.mu.Unlock()
+
+ // These can't come back, so stop is same as Delete.
+ ms.Purge()
+
+ // Unregister from the access time service.
+ ats.Unregister()
+
return nil
}
diff --git a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go
index 6a0f9c6ac8..b160d9d89a 100644
--- a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go
+++ b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go
@@ -1184,184 +1184,185 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
// Varz will output server information on the monitoring port at /varz.
type Varz struct {
- ID string `json:"server_id"`
- Name string `json:"server_name"`
- Version string `json:"version"`
- Proto int `json:"proto"`
- GitCommit string `json:"git_commit,omitempty"`
- GoVersion string `json:"go"`
- Host string `json:"host"`
- Port int `json:"port"`
- AuthRequired bool `json:"auth_required,omitempty"`
- TLSRequired bool `json:"tls_required,omitempty"`
- TLSVerify bool `json:"tls_verify,omitempty"`
- TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
- IP string `json:"ip,omitempty"`
- ClientConnectURLs []string `json:"connect_urls,omitempty"`
- WSConnectURLs []string `json:"ws_connect_urls,omitempty"`
- MaxConn int `json:"max_connections"`
- MaxSubs int `json:"max_subscriptions,omitempty"`
- PingInterval time.Duration `json:"ping_interval"`
- MaxPingsOut int `json:"ping_max"`
- HTTPHost string `json:"http_host"`
- HTTPPort int `json:"http_port"`
- HTTPBasePath string `json:"http_base_path"`
- HTTPSPort int `json:"https_port"`
- AuthTimeout float64 `json:"auth_timeout"`
- MaxControlLine int32 `json:"max_control_line"`
- MaxPayload int `json:"max_payload"`
- MaxPending int64 `json:"max_pending"`
- Cluster ClusterOptsVarz `json:"cluster,omitempty"`
- Gateway GatewayOptsVarz `json:"gateway,omitempty"`
- LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"`
- MQTT MQTTOptsVarz `json:"mqtt,omitempty"`
- Websocket WebsocketOptsVarz `json:"websocket,omitempty"`
- JetStream JetStreamVarz `json:"jetstream,omitempty"`
- TLSTimeout float64 `json:"tls_timeout"`
- WriteDeadline time.Duration `json:"write_deadline"`
- Start time.Time `json:"start"`
- Now time.Time `json:"now"`
- Uptime string `json:"uptime"`
- Mem int64 `json:"mem"`
- Cores int `json:"cores"`
- MaxProcs int `json:"gomaxprocs"`
- MemLimit int64 `json:"gomemlimit,omitempty"`
- CPU float64 `json:"cpu"`
- Connections int `json:"connections"`
- TotalConnections uint64 `json:"total_connections"`
- Routes int `json:"routes"`
- Remotes int `json:"remotes"`
- Leafs int `json:"leafnodes"`
- InMsgs int64 `json:"in_msgs"`
- OutMsgs int64 `json:"out_msgs"`
- InBytes int64 `json:"in_bytes"`
- OutBytes int64 `json:"out_bytes"`
- SlowConsumers int64 `json:"slow_consumers"`
- Subscriptions uint32 `json:"subscriptions"`
- HTTPReqStats map[string]uint64 `json:"http_req_stats"`
- ConfigLoadTime time.Time `json:"config_load_time"`
- ConfigDigest string `json:"config_digest"`
- Tags jwt.TagList `json:"tags,omitempty"`
- TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"`
- TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"`
- SystemAccount string `json:"system_account,omitempty"`
- PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"`
- OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"`
- SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"`
+ ID string `json:"server_id"` // ID is the unique server ID generated at start
+ Name string `json:"server_name"` // Name is the configured server name, equals ID when not set
+ Version string `json:"version"` // Version is the version of the running server
+ Proto int `json:"proto"` // Proto is the protocol version this server supports
+ GitCommit string `json:"git_commit,omitempty"` // GitCommit is the git repository commit hash that the build corresponds with
+ GoVersion string `json:"go"` // GoVersion is the version of Go used to build this binary
+ Host string `json:"host"` // Host is the hostname the server runs on
+ Port int `json:"port"` // Port is the port the server listens on for client connections
+ AuthRequired bool `json:"auth_required,omitempty"` // AuthRequired indicates if users are required to authenticate to join the server
+ TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if connections must use TLS when connecting to this server
+ TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full TLS verification will be performed
+ TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if the OCSP protocol will be used to verify peers
+ IP string `json:"ip,omitempty"` // IP is the IP address the server listens on if set
+ ClientConnectURLs []string `json:"connect_urls,omitempty"` // ClientConnectURLs is the list of URLs NATS clients can use to connect to this server
+ WSConnectURLs []string `json:"ws_connect_urls,omitempty"` // WSConnectURLs is the list of URLs websocket clients can use to connect to this server
+ MaxConn int `json:"max_connections"` // MaxConn is the maximum amount of connections the server can accept
+ MaxSubs int `json:"max_subscriptions,omitempty"` // MaxSubs is the maximum amount of subscriptions the server can manage
+ PingInterval time.Duration `json:"ping_interval"` // PingInterval is the interval the server will send PING messages during periods of inactivity on a connection
+ MaxPingsOut int `json:"ping_max"` // MaxPingsOut is the number of unanswered PINGs after which the connection will be considered stale
+ HTTPHost string `json:"http_host"` // HTTPHost is the HTTP host monitoring connections are accepted on
+ HTTPPort int `json:"http_port"` // HTTPPort is the port monitoring connections are accepted on
+ HTTPBasePath string `json:"http_base_path"` // HTTPBasePath is the path prefix for access to monitor endpoints
+ HTTPSPort int `json:"https_port"` // HTTPSPort is the HTTPS host monitoring connections are accepted on`
+ AuthTimeout float64 `json:"auth_timeout"` // AuthTimeout is the amount of seconds connections have to complete authentication
+ MaxControlLine int32 `json:"max_control_line"` // MaxControlLine is the amount of bytes a signal control message may be
+ MaxPayload int `json:"max_payload"` // MaxPayload is the maximum amount of bytes a message may have as payload
+ MaxPending int64 `json:"max_pending"` // MaxPending is the maximum amount of unprocessed bytes a connection may have
+ Cluster ClusterOptsVarz `json:"cluster,omitempty"` // Cluster is the Cluster state
+ Gateway GatewayOptsVarz `json:"gateway,omitempty"` // Gateway is the Super Cluster state
+ LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"` // LeafNode is the Leafnode state
+ MQTT MQTTOptsVarz `json:"mqtt,omitempty"` // MQTT is the MQTT state
+ Websocket WebsocketOptsVarz `json:"websocket,omitempty"` // Websocket is the Websocket client state
+ JetStream JetStreamVarz `json:"jetstream,omitempty"` // JetStream is the JetStream state
+ TLSTimeout float64 `json:"tls_timeout"` // TLSTimeout is how long TLS operations have to complete
+ WriteDeadline time.Duration `json:"write_deadline"` // WriteDeadline is the maximum time writes to sockets have to complete
+ Start time.Time `json:"start"` // Start is time when the server was started
+ Now time.Time `json:"now"` // Now is the current time of the server
+ Uptime string `json:"uptime"` // Uptime is how long the server has been running
+ Mem int64 `json:"mem"` // Mem is the resident memory allocation
+ Cores int `json:"cores"` // Cores is the number of cores the process has access to
+ MaxProcs int `json:"gomaxprocs"` // MaxProcs is the configured GOMAXPROCS value
+ MemLimit int64 `json:"gomemlimit,omitempty"` // MemLimit is the configured GOMEMLIMIT value
+ CPU float64 `json:"cpu"` // CPU is the current total CPU usage
+ Connections int `json:"connections"` // Connections is the current connected connections
+ TotalConnections uint64 `json:"total_connections"` // TotalConnections is the total connections the server have ever handled
+ Routes int `json:"routes"` // Routes is the number of connected route servers
+ Remotes int `json:"remotes"` // Remotes is the configured route remote endpoints
+ Leafs int `json:"leafnodes"` // Leafs is the number connected leafnode clients
+ InMsgs int64 `json:"in_msgs"` // InMsgs is the number of messages this server received
+ OutMsgs int64 `json:"out_msgs"` // OutMsgs is the number of message this server sent
+ InBytes int64 `json:"in_bytes"` // InBytes is the number of bytes this server received
+ OutBytes int64 `json:"out_bytes"` // OutMsgs is the number of bytes this server sent
+ SlowConsumers int64 `json:"slow_consumers"` // SlowConsumers is the total count of clients that were disconnected since start due to being slow consumers
+ Subscriptions uint32 `json:"subscriptions"` // Subscriptions is the count of active subscriptions
+ HTTPReqStats map[string]uint64 `json:"http_req_stats"` // HTTPReqStats is the number of requests each HTTP endpoint received
+ ConfigLoadTime time.Time `json:"config_load_time"` // ConfigLoadTime is the time the configuration was loaded or reloaded
+ ConfigDigest string `json:"config_digest"` // ConfigDigest is a calculated hash of the current configuration
+ Tags jwt.TagList `json:"tags,omitempty"` // Tags are the tags assigned to the server in configuration
+ Metadata map[string]string `json:"metadata,omitempty"` // Metadata is the metadata assigned to the server in configuration
+ TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"` // TrustedOperatorsJwt is the JWTs for all trusted operators
+ TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"` // TrustedOperatorsClaim is the decoded claims for each trusted operator
+ SystemAccount string `json:"system_account,omitempty"` // SystemAccount is the name of the System account
+ PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` // PinnedAccountFail is how often user logon fails due to the issuer account not being pinned.
+ OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` // OCSPResponseCache is the state of the OCSP cache // OCSPResponseCache holds information about
+ SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` // SlowConsumersStats is statistics about all detected Slow Consumer
}
// JetStreamVarz contains basic runtime information about jetstream
type JetStreamVarz struct {
- Config *JetStreamConfig `json:"config,omitempty"`
- Stats *JetStreamStats `json:"stats,omitempty"`
- Meta *MetaClusterInfo `json:"meta,omitempty"`
- Limits *JSLimitOpts `json:"limits,omitempty"`
+ Config *JetStreamConfig `json:"config,omitempty"` // Config is the active JetStream configuration
+ Stats *JetStreamStats `json:"stats,omitempty"` // Stats is the statistics for the JetStream server
+ Meta *MetaClusterInfo `json:"meta,omitempty"` // Meta is information about the JetStream metalayer
+ Limits *JSLimitOpts `json:"limits,omitempty"` // Limits are the configured JetStream limits
}
// ClusterOptsVarz contains monitoring cluster information
type ClusterOptsVarz struct {
- Name string `json:"name,omitempty"`
- Host string `json:"addr,omitempty"`
- Port int `json:"cluster_port,omitempty"`
- AuthTimeout float64 `json:"auth_timeout,omitempty"`
- URLs []string `json:"urls,omitempty"`
- TLSTimeout float64 `json:"tls_timeout,omitempty"`
- TLSRequired bool `json:"tls_required,omitempty"`
- TLSVerify bool `json:"tls_verify,omitempty"`
- PoolSize int `json:"pool_size,omitempty"`
+ Name string `json:"name,omitempty"` // Name is the configured cluster name
+ Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections
+ Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections
+ AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
+ URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs
+ TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
+ TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
+ TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
+ PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size
}
// GatewayOptsVarz contains monitoring gateway information
type GatewayOptsVarz struct {
- Name string `json:"name,omitempty"`
- Host string `json:"host,omitempty"`
- Port int `json:"port,omitempty"`
- AuthTimeout float64 `json:"auth_timeout,omitempty"`
- TLSTimeout float64 `json:"tls_timeout,omitempty"`
- TLSRequired bool `json:"tls_required,omitempty"`
- TLSVerify bool `json:"tls_verify,omitempty"`
- Advertise string `json:"advertise,omitempty"`
- ConnectRetries int `json:"connect_retries,omitempty"`
- Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"`
- RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster
+ Name string `json:"name,omitempty"` // Name is the configured cluster name
+ Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections
+ Port int `json:"port,omitempty"` // Port is the post gateway connections listens on
+ AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
+ TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
+ TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
+ TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
+ Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients
+ ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make
+ Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes
+ RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected
}
// RemoteGatewayOptsVarz contains monitoring remote gateway information
type RemoteGatewayOptsVarz struct {
- Name string `json:"name"`
- TLSTimeout float64 `json:"tls_timeout,omitempty"`
- URLs []string `json:"urls,omitempty"`
+ Name string `json:"name"` // Name is the name of the remote gateway
+ TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
+ URLs []string `json:"urls,omitempty"` // URLs is the list of Gateway URLs
}
// LeafNodeOptsVarz contains monitoring leaf node information
type LeafNodeOptsVarz struct {
- Host string `json:"host,omitempty"`
- Port int `json:"port,omitempty"`
- AuthTimeout float64 `json:"auth_timeout,omitempty"`
- TLSTimeout float64 `json:"tls_timeout,omitempty"`
- TLSRequired bool `json:"tls_required,omitempty"`
- TLSVerify bool `json:"tls_verify,omitempty"`
- Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"`
- TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
+ Host string `json:"host,omitempty"` // Host is the host the server listens on
+ Port int `json:"port,omitempty"` // Port is the port the server listens on
+ AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time Leafnode connections have to complete authentication
+ TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
+ TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
+ TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
+ Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"` // Remotes is state of configured Leafnode remotes
+ TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed
}
// DenyRules Contains lists of subjects not allowed to be imported/exported
type DenyRules struct {
- Exports []string `json:"exports,omitempty"`
- Imports []string `json:"imports,omitempty"`
+ Exports []string `json:"exports,omitempty"` // Exports are denied exports
+ Imports []string `json:"imports,omitempty"` // Imports are denied imports
}
// RemoteLeafOptsVarz contains monitoring remote leaf node information
type RemoteLeafOptsVarz struct {
- LocalAccount string `json:"local_account,omitempty"`
- TLSTimeout float64 `json:"tls_timeout,omitempty"`
- URLs []string `json:"urls,omitempty"`
- Deny *DenyRules `json:"deny,omitempty"`
- TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
+ LocalAccount string `json:"local_account,omitempty"` // LocalAccount is the local account this leaf is logged into
+ TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
+ URLs []string `json:"urls,omitempty"` // URLs is the list of URLs for the remote Leafnode connection
+ Deny *DenyRules `json:"deny,omitempty"` // Deny is the configured import and exports that the Leafnode may not access
+ TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
}
// MQTTOptsVarz contains monitoring MQTT information
type MQTTOptsVarz struct {
- Host string `json:"host,omitempty"`
- Port int `json:"port,omitempty"`
- NoAuthUser string `json:"no_auth_user,omitempty"`
- AuthTimeout float64 `json:"auth_timeout,omitempty"`
- TLSMap bool `json:"tls_map,omitempty"`
- TLSTimeout float64 `json:"tls_timeout,omitempty"`
- TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"`
- JsDomain string `json:"js_domain,omitempty"`
- AckWait time.Duration `json:"ack_wait,omitempty"`
- MaxAckPending uint16 `json:"max_ack_pending,omitempty"`
- TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
+ Host string `json:"host,omitempty"` // Host is the host the server listens on
+ Port int `json:"port,omitempty"` // Port is the port the server listens on
+ NoAuthUser string `json:"no_auth_user,omitempty"` // NoAuthUser is the user that will be used for unauthenticated connections
+ AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is how long authentication has to complete
+ TLSMap bool `json:"tls_map,omitempty"` // TLSMap indicates if TLS Mapping is enabled
+ TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
+ TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` // TLSPinnedCerts is the list of certificates pinned to this connection
+ JsDomain string `json:"js_domain,omitempty"` // JsDomain is the JetStream domain used for MQTT state
+ AckWait time.Duration `json:"ack_wait,omitempty"` // AckWait is how long the internal JetStream state store will allow acks to complete
+ MaxAckPending uint16 `json:"max_ack_pending,omitempty"` // MaxAckPending is how many outstanding acks the internal JetStream state store will allow
+ TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
}
// WebsocketOptsVarz contains monitoring websocket information
type WebsocketOptsVarz struct {
- Host string `json:"host,omitempty"`
- Port int `json:"port,omitempty"`
- Advertise string `json:"advertise,omitempty"`
- NoAuthUser string `json:"no_auth_user,omitempty"`
- JWTCookie string `json:"jwt_cookie,omitempty"`
- HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"`
- AuthTimeout float64 `json:"auth_timeout,omitempty"`
- NoTLS bool `json:"no_tls,omitempty"`
- TLSMap bool `json:"tls_map,omitempty"`
- TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"`
- SameOrigin bool `json:"same_origin,omitempty"`
- AllowedOrigins []string `json:"allowed_origins,omitempty"`
- Compression bool `json:"compression,omitempty"`
- TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
+ Host string `json:"host,omitempty"` // Host is the host the server listens on
+ Port int `json:"port,omitempty"` // Port is the port the server listens on
+ Advertise string `json:"advertise,omitempty"` // Advertise is the connection URL the server advertises
+ NoAuthUser string `json:"no_auth_user,omitempty"` // NoAuthUser is the user that will be used for unauthenticated connections
+ JWTCookie string `json:"jwt_cookie,omitempty"` // JWTCookie is the name of a cookie the server will read for the connection JWT
+ HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"` // HandshakeTimeout is how long the connection has to complete the websocket setup
+ AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is how long authentication has to complete
+ NoTLS bool `json:"no_tls,omitempty"` // NoTLS indicates if TLS is disabled
+ TLSMap bool `json:"tls_map,omitempty"` // TLSMap indicates if TLS Mapping is enabled
+ TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` // TLSPinnedCerts is the list of certificates pinned to this connection
+ SameOrigin bool `json:"same_origin,omitempty"` // SameOrigin indicates if same origin connections are allowed
+ AllowedOrigins []string `json:"allowed_origins,omitempty"` // AllowedOrigins list of configured trusted origins
+ Compression bool `json:"compression,omitempty"` // Compression indicates if compression is supported
+ TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
}
// OCSPResponseCacheVarz contains OCSP response cache information
type OCSPResponseCacheVarz struct {
- Type string `json:"cache_type,omitempty"`
- Hits int64 `json:"cache_hits,omitempty"`
- Misses int64 `json:"cache_misses,omitempty"`
- Responses int64 `json:"cached_responses,omitempty"`
- Revokes int64 `json:"cached_revoked_responses,omitempty"`
- Goods int64 `json:"cached_good_responses,omitempty"`
- Unknowns int64 `json:"cached_unknown_responses,omitempty"`
+ Type string `json:"cache_type,omitempty"` // Type is the kind of cache being used
+ Hits int64 `json:"cache_hits,omitempty"` // Hits is how many times the cache was able to answer a request
+ Misses int64 `json:"cache_misses,omitempty"` // Misses is how many times the cache failed to answer a request
+ Responses int64 `json:"cached_responses,omitempty"` // Responses is how many responses are currently stored in the cache
+ Revokes int64 `json:"cached_revoked_responses,omitempty"` // Revokes is how many of the stored cache entries are revokes
+ Goods int64 `json:"cached_good_responses,omitempty"` // Goods is how many of the stored cache entries are good responses
+ Unknowns int64 `json:"cached_unknown_responses,omitempty"` // Unknowns is how many of the stored cache entries are unknown responses
}
// VarzOptions are the options passed to Varz().
@@ -1370,10 +1371,10 @@ type VarzOptions struct{}
// SlowConsumersStats contains information about the slow consumers from different type of connections.
type SlowConsumersStats struct {
- Clients uint64 `json:"clients"`
- Routes uint64 `json:"routes"`
- Gateways uint64 `json:"gateways"`
- Leafs uint64 `json:"leafs"`
+ Clients uint64 `json:"clients"` // Clients is how many Clients were slow consumers
+ Routes uint64 `json:"routes"` // Routes is how many Routes were slow consumers
+ Gateways uint64 `json:"gateways"` // Gateways is how many Gateways were slow consumers
+ Leafs uint64 `json:"leafs"` // Leafs is how many Leafnodes were slow consumers
}
func myUptime(d time.Duration) string {
@@ -1431,6 +1432,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
a.last { padding-bottom: 16px }
a.version { font-size: 14; font-weight: 400; width: 312px; text-align: right; margin-top: -2rem }
a.version:hover { color: rgb(22 22 32) }
+ .endpoint { font-size: 12px; color: #999; font-family: monospace; display: none }
+ a:hover .endpoint { display: inline }
@@ -1441,33 +1444,33 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
- General
- JetStream
- Connections
- Accounts
- Account Stats
- Subscriptions
- Routes
- LeafNodes
- Gateways
- Raft Groups
- Health Probe
+ General %s
+ JetStream %s
+ Connections %s
+ Accounts %s
+ Account Stats %s
+ Subscriptions %s
+ Routes %s
+ LeafNodes %s
+ Gateways %s
+ Raft Groups %s
+ Health Probe %s
Help