Merge pull request #1339 from opencloud-eu/dependabot/go_modules/github.com/nats-io/nats-server/v2-2.11.7

build(deps): bump github.com/nats-io/nats-server/v2 from 2.11.6 to 2.11.7
This commit is contained in:
Ralf Haferkamp
2025-08-14 08:49:26 +02:00
committed by GitHub
23 changed files with 886 additions and 578 deletions

2
go.mod
View File

@@ -55,7 +55,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.6
github.com/nats-io/nats-server/v2 v2.11.7
github.com/nats-io/nats.go v1.43.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.0.8

4
go.sum
View File

@@ -821,8 +821,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.6 h1:4VXRjbTUFKEB+7UoaKL3F5Y83xC7MxPoIONOnGgpkHw=
github.com/nats-io/nats-server/v2 v2.11.6/go.mod h1:2xoztlcb4lDL5Blh1/BiukkKELXvKQ5Vy29FPVRBUYs=
github.com/nats-io/nats-server/v2 v2.11.7 h1:lINWQ/Hb3cnaoHmWTjj/7WppZnaSh9C/1cD//nHCbms=
github.com/nats-io/nats-server/v2 v2.11.7/go.mod h1:DchDPVzAsAPqhqm7VLedX0L7hjnV/SYtlmsl9F8U53s=
github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug=
github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=

View File

@@ -4360,7 +4360,7 @@ func sliceHeader(key string, hdr []byte) []byte {
if len(hdr) == 0 {
return nil
}
index := bytes.Index(hdr, stringToBytes(key))
index := bytes.Index(hdr, stringToBytes(key+":"))
hdrLen := len(hdr)
// Check that we have enough characters, this will handle the -1 case of the key not
// being found and will also handle not having enough characters for trailing CRLF.

View File

@@ -58,7 +58,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.11.6"
VERSION = "2.11.7"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -437,7 +437,8 @@ type consumer struct {
rdqi avl.SequenceSet
rdc map[uint64]uint64
replies map[uint64]string
pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum.
pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum.
waitingDeliveries map[string]*waitingDelivery // (Optional) request timeout messages that need to wait for replicated deliveries first.
maxdc uint64
waiting *waitQueue
cfg ConsumerConfig
@@ -819,6 +820,9 @@ func checkConsumerCfg(
}
if config.PriorityPolicy != PriorityNone {
if config.DeliverSubject != "" {
return NewJSConsumerPushWithPriorityGroupError()
}
if len(config.PriorityGroups) == 0 {
return NewJSConsumerPriorityPolicyWithoutGroupError()
}
@@ -1846,7 +1850,7 @@ func (o *consumer) deleteNotActive() {
} else {
// Pull mode.
elapsed := time.Since(o.waiting.last)
if elapsed <= o.cfg.InactiveThreshold {
if elapsed < o.dthresh {
// These need to keep firing so reset but use delta.
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh - elapsed)
@@ -1866,6 +1870,43 @@ func (o *consumer) deleteNotActive() {
o.mu.Unlock()
return
}
// We now know we have no waiting requests, and our last request was long ago.
// However, based on AckWait the consumer could still be actively processing,
// even if we haven't been informed if there were no acks in the meantime.
// We must wait for the message that expires last and start counting down the
// inactive threshold from there.
now := time.Now().UnixNano()
l := len(o.cfg.BackOff)
var delay time.Duration
var ackWait time.Duration
for _, p := range o.pending {
if l == 0 {
ackWait = o.ackWait(0)
} else {
bi := int(o.rdc[p.Sequence])
if bi < 0 {
bi = 0
} else if bi >= l {
bi = l - 1
}
ackWait = o.ackWait(o.cfg.BackOff[bi])
}
if ts := p.Timestamp + ackWait.Nanoseconds() + o.dthresh.Nanoseconds(); ts > now {
delay = max(delay, time.Duration(ts-now))
}
}
// We'll wait for the latest time we expect an ack, plus the inactive threshold.
// Acknowledging a message will reset this back down to just the inactive threshold.
if delay > 0 {
if o.dtmr != nil {
o.dtmr.Reset(delay)
} else {
o.dtmr = time.AfterFunc(delay, o.deleteNotActive)
}
o.mu.Unlock()
return
}
}
s, js := o.mset.srv, o.srv.js.Load()
@@ -2540,11 +2581,23 @@ func (o *consumer) addAckReply(sseq uint64, reply string) {
// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum.
// Lock should be held.
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) {
// Is not explicitly limited in size, but will at maximum hold maximum ack pending.
// Is not explicitly limited in size, but will at most hold maximum ack pending.
if o.pendingDeliveries == nil {
o.pendingDeliveries = make(map[uint64]*jsPubMsg)
}
o.pendingDeliveries[pmsg.seq] = pmsg
// Is not explicitly limited in size, but will at most hold maximum waiting requests.
if o.waitingDeliveries == nil {
o.waitingDeliveries = make(map[string]*waitingDelivery)
}
if wd, ok := o.waitingDeliveries[pmsg.dsubj]; ok {
wd.seq = pmsg.seq
} else {
wd := wdPool.Get().(*waitingDelivery)
wd.seq = pmsg.seq
o.waitingDeliveries[pmsg.dsubj] = wd
}
}
// Lock should be held.
@@ -3446,6 +3499,28 @@ func (wr *waitingRequest) recycle() {
}
}
// Represents an (optional) request timeout that's sent after waiting for replicated deliveries.
type waitingDelivery struct {
seq uint64
pn int // Pending messages.
pb int // Pending bytes.
}
// sync.Pool for waiting deliveries.
var wdPool = sync.Pool{
New: func() any {
return new(waitingDelivery)
},
}
// Force a recycle.
func (wd *waitingDelivery) recycle() {
if wd != nil {
wd.seq, wd.pn, wd.pb = 0, 0, 0
wdPool.Put(wd)
}
}
// waiting queue for requests that are waiting for new messages to arrive.
type waitQueue struct {
n, max int
@@ -3721,8 +3796,19 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
} else {
// We do check for expiration in `processWaiting`, but it is possible to hit the expiry here, and not there.
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
rdWait := o.replicateDeliveries()
if rdWait {
// Check if we need to send the timeout after pending replicated deliveries, or can do so immediately.
if wd, ok := o.waitingDeliveries[wr.reply]; !ok {
rdWait = false
} else {
wd.pn, wd.pb = wr.n, wr.b
}
}
if !rdWait {
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
o.waiting.removeCurrent()
if o.node != nil {
o.removeClusterPendingRequest(wr.reply)
@@ -4187,8 +4273,19 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
for wr := wq.head; wr != nil; {
// Check expiration.
if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
rdWait := o.replicateDeliveries()
if rdWait {
// Check if we need to send the timeout after pending replicated deliveries, or can do so immediately.
if wd, ok := o.waitingDeliveries[wr.reply]; !ok {
rdWait = false
} else {
wd.pn, wd.pb = wr.n, wr.b
}
}
if !rdWait {
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
wr = remove(pre, wr)
continue
}
@@ -4368,16 +4465,19 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
for {
select {
case <-o.ackMsgs.ch:
// If we have an inactiveThreshold set, mark our activity.
// Do this before processing acks, otherwise we might race if there are no pending messages
// anymore and the inactivity threshold kicks in before we're able to mark activity.
if hasInactiveThresh {
o.suppressDeletion()
}
acks := o.ackMsgs.pop()
for _, ack := range acks {
o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg)
ack.returnToPool()
}
o.ackMsgs.recycle(&acks)
// If we have an inactiveThreshold set, mark our activity.
if hasInactiveThresh {
o.suppressDeletion()
}
case <-ticker.C:
o.checkAckFloor()
case <-qch:
@@ -4425,8 +4525,11 @@ func (o *consumer) suppressDeletion() {
// if dtmr is not nil we have started the countdown, simply reset to threshold.
o.dtmr.Reset(o.dthresh)
} else if o.isPullMode() && o.waiting != nil {
// Pull mode always has timer running, just update last on waiting queue.
// Pull mode always has timer running, update last on waiting queue.
o.waiting.last = time.Now()
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh)
}
}
}
@@ -4485,7 +4588,6 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
delay time.Duration
sz int
wrn, wrb int
wrNoWait bool
)
o.mu.Lock()
@@ -4564,7 +4666,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if o.isPushMode() {
dsubj = o.dsubj
} else if wr := o.nextWaiting(sz); wr != nil {
wrn, wrb, wrNoWait = wr.n, wr.b, wr.noWait
wrn, wrb = wr.n, wr.b
dsubj = wr.reply
if o.cfg.PriorityPolicy == PriorityPinnedClient {
// FIXME(jrm): Can we make this prettier?
@@ -4639,7 +4741,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
// Do actual delivery.
o.deliverMsg(dsubj, ackReply, pmsg, dc, rp, wrNoWait)
o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)
// If given request fulfilled batch size, but there are still pending bytes, send information about it.
if wrn <= 0 && wrb > 0 {
@@ -4838,7 +4940,7 @@ func convertToHeadersOnly(pmsg *jsPubMsg) {
// Deliver a msg to the consumer.
// Lock should be held and o.mset validated to be non-nil.
func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy, wrNoWait bool) {
func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) {
if o.mset == nil {
pmsg.returnToPool()
return
@@ -4871,15 +4973,10 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
}
// Send message.
// If we're replicated we MUST only send the message AFTER we've got quorum for updating
// delivered state. Otherwise, we could be in an invalid state after a leader change.
// We can send immediately if not replicated, not using acks, or using flow control (incompatible).
// TODO(mvv): If NoWait we also bypass replicating first.
// Ideally we'd only send the NoWait request timeout after replication and delivery.
if o.node == nil || ap == AckNone || o.cfg.FlowControl || wrNoWait {
o.outq.send(pmsg)
} else {
if o.replicateDeliveries() {
o.addReplicatedQueuedMsg(pmsg)
} else {
o.outq.send(pmsg)
}
// Flow control.
@@ -4902,6 +4999,15 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
}
}
// replicateDeliveries returns whether deliveries should be replicated before sending them.
// If we're replicated we MUST only send the message AFTER we've got quorum for updating
// delivered state. Otherwise, we could be in an invalid state after a leader change.
// We can send immediately if not replicated, not using acks, or using flow control (incompatible).
// Lock should be held.
func (o *consumer) replicateDeliveries() bool {
return o.node != nil && o.cfg.AckPolicy != AckNone && !o.cfg.FlowControl
}
func (o *consumer) needFlowControl(sz int) bool {
if o.maxpb == 0 {
return false
@@ -6148,4 +6254,8 @@ func (o *consumer) resetPendingDeliveries() {
pmsg.returnToPool()
}
o.pendingDeliveries = nil
for _, wd := range o.waitingDeliveries {
wd.recycle()
}
o.waitingDeliveries = nil
}

View File

@@ -1658,5 +1658,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerPushWithPriorityGroupErr",
"code": 400,
"error_code": 10178,
"description": "priority groups can not be used with push consumers",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]

View File

@@ -1401,10 +1401,9 @@ func (s *Server) initEventTracking() {
}
}
// User info.
// TODO(dlc) - Can be internal and not forwarded since bound server for the client connection
// is only one that will answer. This breaks tests since we still forward on remote server connect.
if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
// User info. Do not propagate interest so that we know the local server to the connection
// is the only one that will answer the requests.
if _, err := s.sysSubscribeInternal(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}

View File

@@ -306,8 +306,8 @@ const (
consumerDir = "obs"
// Index file for a consumer.
consumerState = "o.dat"
// The suffix that will be given to a new temporary block during compression.
compressTmpSuffix = ".tmp"
// The suffix that will be given to a new temporary block for compression or when rewriting the full file.
blkTmpSuffix = ".tmp"
// This is where we keep state on templates.
tmplsDir = "templates"
// Maximum size of a write buffer we may consider for re-use.
@@ -651,7 +651,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
// Create or delete the THW if needed.
if cfg.AllowMsgTTL && fs.ttls == nil {
fs.ttls = thw.NewHashWheel()
fs.recoverTTLState()
} else if !cfg.AllowMsgTTL && fs.ttls != nil {
fs.ttls = nil
}
@@ -1201,7 +1201,9 @@ func (fs *fileStore) rebuildStateLocked(ld *LostStreamData) {
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
}
if lseq := atomic.LoadUint64(&mb.last.seq); lseq > fs.state.LastSeq {
// Preserve last time, could have erased the last message in one block, and then
// have a tombstone with the proper timestamp afterward in another block
if lseq := atomic.LoadUint64(&mb.last.seq); lseq >= fs.state.LastSeq {
fs.state.LastSeq = lseq
if mb.last.ts == 0 {
fs.state.LastTime = time.Time{}
@@ -1271,10 +1273,13 @@ func (mb *msgBlock) convertCipher() error {
buf, _ := mb.loadBlock(nil)
bek.XORKeyStream(buf, buf)
// Make sure we can parse with old cipher and key file.
if err = mb.indexCacheBuf(buf); err != nil {
// Check for compression, and make sure we can parse with old cipher and key file.
if nbuf, err := mb.decompressIfNeeded(buf); err != nil {
return err
} else if err = mb.indexCacheBuf(nbuf); err != nil {
return err
}
// Reset the cache since we just read everything in.
mb.cache = nil
@@ -1306,6 +1311,10 @@ func (mb *msgBlock) convertToEncrypted() error {
if err != nil {
return err
}
// Check for compression.
if buf, err = mb.decompressIfNeeded(buf); err != nil {
return err
}
if err := mb.indexCacheBuf(buf); err != nil {
// This likely indicates this was already encrypted or corrupt.
mb.cache = nil
@@ -1376,15 +1385,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
firstNeedsSet := true
// Check if we need to decrypt.
if mb.bek != nil && len(buf) > 0 {
// Recreate to reset counter.
mb.bek, err = genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return nil, nil, err
}
mb.bek.XORKeyStream(buf, buf)
if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
return nil, nil, err
}
// Check for compression.
if buf, err = mb.decompressIfNeeded(buf); err != nil {
return nil, nil, err
@@ -1799,6 +1802,11 @@ func (fs *fileStore) recoverFullState() (rerr error) {
}
bi += n
}
// Pre-emptively mark block as closed, we'll confirm this block
// still exists on disk and report it as lost if not.
mb.closed = true
// Only add in if not empty or the lmb.
if mb.msgs > 0 || i == lastIndex {
fs.addMsgBlock(mb)
@@ -1873,10 +1881,26 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if index > blkIndex {
fs.warn("Stream state outdated, found extra blocks, will rebuild")
return errPriorState
} else if mb, ok := fs.bim[index]; ok {
mb.closed = false
}
}
}
var rebuild bool
for _, mb := range fs.blks {
if mb.closed {
rebuild = true
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", mb.index)
}
}
if rebuild {
return errPriorState
}
// We check first and last seq and number of msgs and bytes. If there is a difference,
// return and error so we rebuild from the message block state on disk.
if !trackingStatesEqual(&fs.state, &mstate) {
@@ -1888,6 +1912,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return nil
}
// Lock should be held.
func (fs *fileStore) recoverTTLState() error {
// See if we have a timed hash wheel for TTLs.
<-dios
@@ -1917,31 +1942,34 @@ func (fs *fileStore) recoverTTLState() error {
defer fs.resetAgeChk(0)
if fs.state.Msgs > 0 && ttlseq <= fs.state.LastSeq {
fs.warn("TTL state is outdated; attempting to recover using linear scan (seq %d to %d)", ttlseq, fs.state.LastSeq)
var sm StoreMsg
mb := fs.selectMsgBlock(ttlseq)
if mb == nil {
return nil
}
mblseq := atomic.LoadUint64(&mb.last.seq)
var (
mb *msgBlock
sm StoreMsg
mblseq uint64
)
for seq := ttlseq; seq <= fs.state.LastSeq; seq++ {
retry:
if mb == nil {
if mb = fs.selectMsgBlock(seq); mb == nil {
// Selecting the message block should return a block that contains this sequence,
// or a later block if it can't be found.
// It's an error if we can't find any block within the bounds of first and last seq.
fs.warn("Error loading msg block with seq %d for recovering TTL: %s", seq)
continue
}
seq = atomic.LoadUint64(&mb.first.seq)
mblseq = atomic.LoadUint64(&mb.last.seq)
}
if mb.ttls == 0 {
// None of the messages in the block have message TTLs so don't
// bother doing anything further with this block, skip to the end.
seq = atomic.LoadUint64(&mb.last.seq) + 1
}
if seq > mblseq {
// We've reached the end of the loaded block, see if we can continue
// by loading the next one.
// We've reached the end of the loaded block, so let's go back to the
// beginning and process the next block.
mb.tryForceExpireCache()
if mb = fs.selectMsgBlock(seq); mb == nil {
// TODO(nat): Deal with gaps properly. Right now this will be
// probably expensive on CPU.
continue
}
mblseq = atomic.LoadUint64(&mb.last.seq)
// At this point we've loaded another block, so let's go back to the
// beginning and see if we need to skip this one too.
mb = nil
goto retry
}
msg, _, err := mb.fetchMsgNoCopy(seq, &sm)
@@ -1983,12 +2011,9 @@ func (mb *msgBlock) lastChecksum() []byte {
}
if mb.bek != nil {
if buf, _ := mb.loadBlock(nil); len(buf) >= checksumSize {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
return nil
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
copy(lchk[0:], buf[len(buf)-checksumSize:])
}
} else {
@@ -2082,7 +2107,9 @@ func (fs *fileStore) recoverMsgs() error {
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
}
if lseq := atomic.LoadUint64(&mb.last.seq); lseq > fs.state.LastSeq {
// Preserve last time, could have erased the last message in one block, and then
// have a tombstone with the proper timestamp afterward in another block
if lseq := atomic.LoadUint64(&mb.last.seq); lseq >= fs.state.LastSeq {
fs.state.LastSeq = lseq
if mb.last.ts == 0 {
fs.state.LastTime = time.Time{}
@@ -3935,16 +3962,27 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
var rbuf []byte
if lmb := fs.lmb; lmb != nil {
lmb.mu.Lock()
index = lmb.index + 1
// Flush any pending messages.
lmb.flushPendingMsgsLocked()
// Determine if we can reclaim any resources here.
if fs.fip {
lmb.mu.Lock()
lmb.closeFDsLocked()
if lmb.cache != nil {
// Reset write timestamp and see if we can expire this cache.
rbuf = lmb.tryExpireWriteCache()
}
lmb.mu.Unlock()
lmb.closeFDsLockedNoCheck()
if lmb.cache != nil {
// Reset write timestamp and see if we can expire this cache.
rbuf = lmb.tryExpireWriteCache()
}
lmb.mu.Unlock()
if fs.fcfg.Compression != NoCompression {
// We've now reached the end of this message block, if we want
// to compress blocks then now's the time to do it.
go func() {
lmb.mu.Lock()
defer lmb.mu.Unlock()
lmb.recompressOnDiskIfNeeded()
}()
}
}
@@ -4208,20 +4246,19 @@ func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64,
// we will place an empty record marking the sequence as used. The
// sequence will be marked erased.
// fs lock should be held.
func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
func (mb *msgBlock) skipMsg(seq uint64, now int64) {
if mb == nil {
return
}
var needsRecord bool
nowts := ats.AccessTime()
mb.mu.Lock()
// If we are empty can just do meta.
if mb.msgs == 0 {
atomic.StoreUint64(&mb.last.seq, seq)
mb.last.ts = nowts
mb.last.ts = now
atomic.StoreUint64(&mb.first.seq, seq+1)
mb.first.ts = nowts
mb.first.ts = 0
needsRecord = mb == mb.fs.lmb
if needsRecord && mb.rbytes > 0 {
// We want to make sure since we have no messages
@@ -4240,7 +4277,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
mb.mu.Unlock()
if needsRecord {
mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, nowts, true)
mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, now, true)
} else {
mb.kickFlusher()
}
@@ -4258,18 +4295,18 @@ func (fs *fileStore) SkipMsg() uint64 {
}
// Grab time and last seq.
now, seq := time.Now(), fs.state.LastSeq+1
now, seq := ats.AccessTime(), fs.state.LastSeq+1
// Write skip msg.
mb.skipMsg(seq, now)
// Update fs state.
fs.state.LastSeq, fs.state.LastTime = seq, now
fs.state.LastSeq, fs.state.LastTime = seq, time.Unix(0, now).UTC()
if fs.state.Msgs == 0 {
fs.state.FirstSeq, fs.state.FirstTime = seq, now
fs.state.FirstSeq, fs.state.FirstTime = seq, time.Time{}
}
if seq == fs.state.FirstSeq {
fs.state.FirstSeq, fs.state.FirstTime = seq+1, now
fs.state.FirstSeq, fs.state.FirstTime = seq+1, time.Time{}
}
// Mark as dirty for stream state.
fs.dirty++
@@ -4299,11 +4336,6 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
numDeletes += mb.dmap.Size()
}
if mb == nil || numDeletes > maxDeletes && mb.msgs > 0 || mb.msgs > 0 && mb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize {
if mb != nil && fs.fcfg.Compression != NoCompression {
// We've now reached the end of this message block, if we want
// to compress blocks then now's the time to do it.
go mb.recompressOnDiskIfNeeded()
}
var err error
if mb, err = fs.newMsgBlockForWrite(); err != nil {
return err
@@ -4311,17 +4343,16 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
}
// Insert into dmap all entries and place last as marker.
now := time.Now()
nowts := now.UnixNano()
now := ats.AccessTime()
lseq := seq + num - 1
mb.mu.Lock()
// If we are empty update meta directly.
if mb.msgs == 0 {
atomic.StoreUint64(&mb.last.seq, lseq)
mb.last.ts = nowts
mb.last.ts = now
atomic.StoreUint64(&mb.first.seq, lseq+1)
mb.first.ts = nowts
mb.first.ts = 0
} else {
for ; seq <= lseq; seq++ {
mb.dmap.Insert(seq)
@@ -4330,13 +4361,13 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
mb.mu.Unlock()
// Write out our placeholder.
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, now, true)
// Now update FS accounting.
// Update fs state.
fs.state.LastSeq, fs.state.LastTime = lseq, now
fs.state.LastSeq, fs.state.LastTime = lseq, time.Unix(0, now).UTC()
if fs.state.Msgs == 0 {
fs.state.FirstSeq, fs.state.FirstTime = lseq+1, now
fs.state.FirstSeq, fs.state.FirstTime = lseq+1, time.Time{}
}
// Mark as dirty for stream state.
@@ -4759,18 +4790,19 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
if secure {
// Grab record info.
ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq))
if err := mb.eraseMsg(seq, int(ri), int(rl)); err != nil {
return false, err
}
}
fifo := seq == atomic.LoadUint64(&mb.first.seq)
isLastBlock := mb == fs.lmb
isEmpty := mb.msgs == 0
// If erase but block is empty, we can simply remove the block later.
if secure && !isEmpty {
// Grab record info.
ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq))
if err := mb.eraseMsg(seq, int(ri), int(rl), isLastBlock); err != nil {
return false, err
}
}
if fifo {
mb.selectNextFirst()
if !isEmpty {
@@ -5150,7 +5182,7 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) {
}
// Lock should be held.
func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error {
func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int, isLastBlock bool) error {
var le = binary.LittleEndian
var hdr [msgHdrSize]byte
@@ -5192,35 +5224,23 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error {
// Disk
if mb.cache.off+mb.cache.wp > ri {
<-dios
mfd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
dios <- struct{}{}
if err != nil {
return err
}
defer mfd.Close()
if _, err = mfd.WriteAt(nbytes, int64(ri)); err == nil {
mfd.Sync()
}
if err != nil {
if err := mb.atomicOverwriteFile(mb.cache.buf, !isLastBlock); err != nil {
return err
}
}
return nil
}
// Truncate this message block to the storedMsg.
func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
mb.mu.Lock()
defer mb.mu.Unlock()
// Truncate this message block to the tseq and ts.
// Lock should be held.
func (mb *msgBlock) truncate(tseq uint64, ts int64) (nmsgs, nbytes uint64, err error) {
// Make sure we are loaded to process messages etc.
if err := mb.loadMsgsWithLock(); err != nil {
return 0, 0, err
}
// Calculate new eof using slot info from our new last sm.
ri, rl, _, err := mb.slotInfo(int(sm.seq - mb.cache.fseq))
ri, rl, _, err := mb.slotInfo(int(tseq - mb.cache.fseq))
if err != nil {
return 0, 0, err
}
@@ -5232,7 +5252,7 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
checkDmap := mb.dmap.Size() > 0
var smv StoreMsg
for seq := atomic.LoadUint64(&mb.last.seq); seq > sm.seq; seq-- {
for seq := atomic.LoadUint64(&mb.last.seq); seq > tseq; seq-- {
if checkDmap {
if mb.dmap.Exists(seq) {
// Delete and skip to next.
@@ -5266,46 +5286,19 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
if err != nil {
return 0, 0, fmt.Errorf("failed to load block from disk: %w", err)
}
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return 0, 0, err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
return 0, 0, err
}
buf, err = mb.decompressIfNeeded(buf)
if err != nil {
if buf, err = mb.decompressIfNeeded(buf); err != nil {
return 0, 0, fmt.Errorf("failed to decompress block: %w", err)
}
buf = buf[:eof]
copy(mb.lchk[0:], buf[:len(buf)-checksumSize])
buf, err = mb.cmp.Compress(buf)
if err != nil {
return 0, 0, fmt.Errorf("failed to recompress block: %w", err)
// We did decompress but don't recompress the truncated buffer here since we're the last block
// and would otherwise have compressed data and allow to write uncompressed data in the same block.
if err = mb.atomicOverwriteFile(buf, false); err != nil {
return 0, 0, err
}
meta := &CompressionInfo{
Algorithm: mb.cmp,
OriginalSize: uint64(eof),
}
buf = append(meta.MarshalMetadata(), buf...)
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return 0, 0, err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
}
n, err := mb.writeAt(buf, 0)
if err != nil {
return 0, 0, fmt.Errorf("failed to rewrite compressed block: %w", err)
}
if n != len(buf) {
return 0, 0, fmt.Errorf("short write (%d != %d)", n, len(buf))
}
mb.mfd.Truncate(int64(len(buf)))
mb.mfd.Sync()
} else if mb.mfd != nil {
mb.mfd.Truncate(eof)
mb.mfd.Sync()
@@ -5318,8 +5311,8 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
}
// Update our last msg.
atomic.StoreUint64(&mb.last.seq, sm.seq)
mb.last.ts = sm.ts
atomic.StoreUint64(&mb.last.seq, tseq)
mb.last.ts = ts
// Clear our cache.
mb.clearCacheAndOffset()
@@ -5383,7 +5376,11 @@ func (fs *fileStore) selectNextFirst() {
mb := fs.blks[0]
mb.mu.RLock()
fs.state.FirstSeq = atomic.LoadUint64(&mb.first.seq)
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
if mb.first.ts == 0 {
fs.state.FirstTime = time.Time{}
} else {
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
mb.mu.RUnlock()
} else {
// Could not find anything, so treat like purge
@@ -5649,7 +5646,7 @@ func (fs *fileStore) expireMsgs() {
// if it was the last message of that particular subject that we just deleted.
if sdmEnabled {
if last, ok := fs.shouldProcessSdm(seq, sm.subj); ok {
sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0
sdm := last && isSubjectDeleteMarker(sm.hdr)
fs.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL)
}
} else {
@@ -5682,7 +5679,7 @@ func (fs *fileStore) expireMsgs() {
if ttlSdm == nil {
ttlSdm = make(map[string][]SDMBySubj, 1)
}
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0})
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)})
} else {
// Collect sequences to remove. Don't remove messages inline here,
// as that releases the lock and THW is not thread-safe.
@@ -6113,14 +6110,6 @@ func (fs *fileStore) checkLastBlock(rl uint64) (lmb *msgBlock, err error) {
lmb = fs.lmb
rbytes := lmb.blkSize()
if lmb == nil || (rbytes > 0 && rbytes+rl > fs.fcfg.BlockSize) {
if lmb != nil {
lmb.flushPendingMsgs()
if fs.fcfg.Compression != NoCompression {
// We've now reached the end of this message block, if we want
// to compress blocks then now's the time to do it.
go lmb.recompressOnDiskIfNeeded()
}
}
if lmb, err = fs.newMsgBlockForWrite(); err != nil {
return nil, err
}
@@ -6173,13 +6162,9 @@ func (fs *fileStore) writeTombstoneNoFlush(seq uint64, ts int64) error {
return lmb.writeTombstoneNoFlush(seq, ts)
}
// Lock should be held.
func (mb *msgBlock) recompressOnDiskIfNeeded() error {
alg := mb.fs.fcfg.Compression
mb.mu.Lock()
defer mb.mu.Unlock()
origFN := mb.mfn // The original message block on disk.
tmpFN := mb.mfn + compressTmpSuffix // The compressed block will be written here.
// Open up the file block and read in the entire contents into memory.
// One of two things will happen:
@@ -6188,7 +6173,7 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// 2. The block will be uncompressed, in which case we will compress it
// and then write it back out to disk, re-encrypting if necessary.
<-dios
origBuf, err := os.ReadFile(origFN)
origBuf, err := os.ReadFile(mb.mfn)
dios <- struct{}{}
if err != nil {
@@ -6200,13 +6185,8 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// compression can be as efficient as possible on the raw data, whereas
// the encrypted ciphertext will not compress anywhere near as well.
// The block encryption also covers the optional compression metadata.
if mb.bek != nil && len(origBuf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return err
}
mb.bek = bek
mb.bek.XORKeyStream(origBuf, origBuf)
if err = mb.encryptOrDecryptIfNeeded(origBuf); err != nil {
return err
}
meta := &CompressionInfo{}
@@ -6231,6 +6211,19 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
}
}
return mb.atomicOverwriteFile(origBuf, true)
}
// Lock should be held.
func (mb *msgBlock) atomicOverwriteFile(buf []byte, allowCompress bool) error {
if mb.mfd != nil {
mb.closeFDsLockedNoCheck()
defer mb.enableForWriting(mb.fs.fip)
}
origFN := mb.mfn // The original message block on disk.
tmpFN := mb.mfn + blkTmpSuffix // The new block will be written here.
// Rather than modifying the existing block on disk (which is a dangerous
// operation if something goes wrong), create a new temporary file. We will
// write out the new block here and then swap the files around afterwards
@@ -6249,41 +6242,37 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
return err
}
// The original buffer at this point is uncompressed, so we will now compress
// it if needed. Note that if the selected algorithm is NoCompression, the
// Compress function will just return the input buffer unmodified.
cmpBuf, err := alg.Compress(origBuf)
if err != nil {
return errorCleanup(fmt.Errorf("failed to compress block: %w", err))
}
alg := NoCompression
if calg := mb.fs.fcfg.Compression; calg != NoCompression && allowCompress {
alg = calg
// The original buffer at this point is uncompressed, so we will now compress
// it if needed. Note that if the selected algorithm is NoCompression, the
// Compress function will just return the input buffer unmodified.
if buf, err = alg.Compress(buf); err != nil {
return errorCleanup(fmt.Errorf("failed to compress block: %w", err))
}
// We only need to write out the metadata header if compression is enabled.
// If we're trying to uncompress the file on disk at this point, don't bother
// writing metadata.
if alg != NoCompression {
// We only need to write out the metadata header if compression is enabled.
// If we're trying to uncompress the file on disk at this point, don't bother
// writing metadata.
meta := &CompressionInfo{
Algorithm: alg,
OriginalSize: uint64(len(origBuf)),
OriginalSize: uint64(len(buf)),
}
cmpBuf = append(meta.MarshalMetadata(), cmpBuf...)
buf = append(meta.MarshalMetadata(), buf...)
}
// Re-encrypt the block if necessary.
if mb.bek != nil && len(cmpBuf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return errorCleanup(err)
}
mb.bek = bek
mb.bek.XORKeyStream(cmpBuf, cmpBuf)
if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
return errorCleanup(err)
}
// Write the new block data (which might be compressed or encrypted) to the
// temporary file.
if n, err := tmpFD.Write(cmpBuf); err != nil {
if n, err := tmpFD.Write(buf); err != nil {
return errorCleanup(fmt.Errorf("failed to write to temporary file: %w", err))
} else if n != len(cmpBuf) {
return errorCleanup(fmt.Errorf("short write to temporary file (%d != %d)", n, len(cmpBuf)))
} else if n != len(buf) {
return errorCleanup(fmt.Errorf("short write to temporary file (%d != %d)", n, len(buf)))
}
if err := tmpFD.Sync(); err != nil {
return errorCleanup(fmt.Errorf("failed to sync temporary file: %w", err))
@@ -6303,11 +6292,12 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
mb.cmp = alg
// Also update rbytes
mb.rbytes = uint64(len(cmpBuf))
mb.rbytes = uint64(len(buf))
return nil
}
// Lock should be held.
func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) {
var meta CompressionInfo
if n, err := meta.UnmarshalMetadata(buf); err != nil {
@@ -6328,6 +6318,19 @@ func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) {
}
}
// Lock should be held.
func (mb *msgBlock) encryptOrDecryptIfNeeded(buf []byte) error {
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
}
return nil
}
// Lock should be held.
func (mb *msgBlock) ensureRawBytesLoaded() error {
if mb.rbytes > 0 {
@@ -6631,7 +6634,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// If we have a hole fill it.
for dseq := mbFirstSeq + uint64(len(idx)); dseq < seq; dseq++ {
idx = append(idx, dbit)
if dms == 0 {
if dms == 0 && dseq != 0 {
mb.dmap.Insert(dseq)
}
}
@@ -6645,7 +6648,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
}
// Make sure our dmap has this entry if it was erased.
if erased && dms == 0 {
if erased && dms == 0 && seq != 0 {
mb.dmap.Insert(seq)
}
@@ -7009,15 +7012,9 @@ checkCache:
mb.clearCacheAndOffset()
// Check if we need to decrypt.
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
if err = mb.encryptOrDecryptIfNeeded(buf); err != nil {
return err
}
// Check for compression.
if buf, err = mb.decompressIfNeeded(buf); err != nil {
return err
@@ -8094,7 +8091,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
firstSeqNeedsUpdate = firstSeqNeedsUpdate || seq == fs.state.FirstSeq
} else if seq == fs.state.FirstSeq {
fs.state.FirstSeq = atomic.LoadUint64(&mb.first.seq) // new one.
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
if mb.first.ts == 0 {
fs.state.FirstTime = time.Time{}
} else {
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
}
} else {
// Out of order delete.
@@ -8143,7 +8144,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
return purged, err
}
}
// Flush any pending. If we change blocks the checkLastBlock() will flush any pending for us.
// Flush any pending. If we change blocks the newMsgBlockForWrite() will flush any pending for us.
if lmb := fs.lmb; lmb != nil {
lmb.flushPendingMsgs()
}
@@ -8435,8 +8436,17 @@ SKIP:
smb.mu.Unlock()
// Write any tombstones as needed.
for _, tomb := range tombs {
fs.writeTombstone(tomb.seq, tomb.ts)
// When writing multiple tombstones we will flush at the end.
if len(tombs) > 0 {
for _, tomb := range tombs {
if err := fs.writeTombstoneNoFlush(tomb.seq, tomb.ts); err != nil {
return purged, err
}
}
// Flush any pending. If we change blocks the newMsgBlockForWrite() will flush any pending for us.
if lmb := fs.lmb; lmb != nil {
lmb.flushPendingMsgs()
}
}
if deleted > 0 {
@@ -8601,55 +8611,118 @@ func (fs *fileStore) Truncate(seq uint64) error {
return ErrStoreSnapshotInProgress
}
nlmb := fs.selectMsgBlock(seq)
if nlmb == nil {
fs.mu.Unlock()
return ErrInvalidSequence
}
lsm, _, _ := nlmb.fetchMsgNoCopy(seq, nil)
if lsm == nil {
fs.mu.Unlock()
return ErrInvalidSequence
var lsm *StoreMsg
smb := fs.selectMsgBlock(seq)
if smb != nil {
lsm, _, _ = smb.fetchMsgNoCopy(seq, nil)
}
// Set lmb to nlmb and make sure writeable.
fs.lmb = nlmb
if err := nlmb.enableForWriting(fs.fip); err != nil {
fs.mu.Unlock()
// Reset last so new block doesn't contain truncated sequences/timestamps.
var lastTime int64
if lsm != nil {
lastTime = lsm.ts
} else if smb != nil {
lastTime = smb.last.ts
} else {
lastTime = fs.state.LastTime.UnixNano()
}
fs.state.LastSeq = seq
fs.state.LastTime = time.Unix(0, lastTime).UTC()
// Always create a new write block for any tombstones.
// We'll truncate the selected message block as the last step, so can't write tombstones to it.
// If we end up not needing to write tombstones, this block will be cleaned up at the end.
tmb, err := fs.newMsgBlockForWrite()
if err != nil {
return err
}
// Collect all tombstones, we want to put these back so we can survive
// a restore without index.db properly.
var tombs []msgId
tombs = append(tombs, nlmb.tombs()...)
// If the selected block is not found or the message was deleted, we'll need to write a tombstone
// at the truncated sequence so we don't roll backward on our last sequence and timestamp.
if lsm == nil {
fs.writeTombstone(seq, lastTime)
}
var purged, bytes uint64
// Truncate our new last message block.
nmsgs, nbytes, err := nlmb.truncate(lsm)
if err != nil {
fs.mu.Unlock()
return fmt.Errorf("nlmb.truncate: %w", err)
}
// Account for the truncated msgs and bytes.
purged += nmsgs
bytes += nbytes
// Remove any left over msg blocks.
getLastMsgBlock := func() *msgBlock { return fs.blks[len(fs.blks)-1] }
for mb := getLastMsgBlock(); mb != nlmb; mb = getLastMsgBlock() {
getLastMsgBlock := func() *msgBlock {
// Start at one before last, tmb will be the last most of the time
// unless a new block gets added for tombstones.
for i := len(fs.blks) - 2; i >= 0; i-- {
if mb := fs.blks[i]; mb.index < tmb.index {
return mb
}
}
return nil
}
for mb := getLastMsgBlock(); mb != nil && mb != smb; mb = getLastMsgBlock() {
mb.mu.Lock()
// We do this to load tombs.
tombs = append(tombs, mb.tombsLocked()...)
purged += mb.msgs
bytes += mb.bytes
// We could have tombstones for messages before the truncated sequence.
// Need to store those for blocks we're about to remove.
if tombs := mb.tombsLocked(); len(tombs) > 0 {
// Temporarily unlock while we write tombstones.
mb.mu.Unlock()
for _, tomb := range tombs {
if tomb.seq < seq {
fs.writeTombstone(tomb.seq, tomb.ts)
}
}
mb.mu.Lock()
}
fs.removeMsgBlock(mb)
mb.mu.Unlock()
}
hasWrittenTombstones := len(tmb.tombs()) > 0
if smb != nil {
// Make sure writeable.
smb.mu.Lock()
if err := smb.enableForWriting(fs.fip); err != nil {
smb.mu.Unlock()
fs.mu.Unlock()
return err
}
// Truncate our selected message block.
nmsgs, nbytes, err := smb.truncate(seq, lastTime)
if err != nil {
smb.mu.Unlock()
fs.mu.Unlock()
return fmt.Errorf("smb.truncate: %w", err)
}
// Account for the truncated msgs and bytes.
purged += nmsgs
bytes += nbytes
// The selected message block is not the last anymore, need to close down resources.
if hasWrittenTombstones {
// Quit our loops.
if smb.qch != nil {
close(smb.qch)
smb.qch = nil
}
smb.closeFDsLockedNoCheck()
smb.recompressOnDiskIfNeeded()
}
smb.mu.Unlock()
}
// If no tombstones were written, we can remove the block and
// purely rely on the selected block as the last block.
if !hasWrittenTombstones {
fs.lmb = smb
tmb.mu.Lock()
fs.removeMsgBlock(tmb)
tmb.mu.Unlock()
}
// Reset last.
fs.state.LastSeq = lsm.seq
fs.state.LastTime = time.Unix(0, lsm.ts).UTC()
fs.state.LastSeq = seq
fs.state.LastTime = time.Unix(0, lastTime).UTC()
// Update msgs and bytes.
if purged > fs.state.Msgs {
purged = fs.state.Msgs
@@ -8663,16 +8736,6 @@ func (fs *fileStore) Truncate(seq uint64) error {
// Reset our subject lookup info.
fs.resetGlobalPerSubjectInfo()
// Always create new write block.
fs.newMsgBlockForWrite()
// Write any tombstones as needed.
for _, tomb := range tombs {
if tomb.seq <= lsm.seq {
fs.writeTombstone(tomb.seq, tomb.ts)
}
}
// Any existing state file no longer applicable. We will force write a new one
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))

View File

@@ -40,15 +40,15 @@ import (
// JetStreamConfig determines this server's configuration.
// MaxMemory and MaxStore are in bytes.
type JetStreamConfig struct {
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
StoreDir string `json:"store_dir,omitempty"`
SyncInterval time.Duration `json:"sync_interval,omitempty"`
SyncAlways bool `json:"sync_always,omitempty"`
Domain string `json:"domain,omitempty"`
CompressOK bool `json:"compress_ok,omitempty"`
UniqueTag string `json:"unique_tag,omitempty"`
Strict bool `json:"strict,omitempty"`
MaxMemory int64 `json:"max_memory"` // MaxMemory is the maximum size of memory type streams
MaxStore int64 `json:"max_storage"` // MaxStore is the maximum size of file store type streams
StoreDir string `json:"store_dir,omitempty"` // StoreDir is where storage files are stored
SyncInterval time.Duration `json:"sync_interval,omitempty"` // SyncInterval is how frequently we sync to disk in the background by calling fsync
SyncAlways bool `json:"sync_always,omitempty"` // SyncAlways indicates flushes are done after every write
Domain string `json:"domain,omitempty"` // Domain is the JetStream domain
CompressOK bool `json:"compress_ok,omitempty"` // CompressOK indicates if compression is supported
UniqueTag string `json:"unique_tag,omitempty"` // UniqueTag is the unique tag assigned to this instance
Strict bool `json:"strict,omitempty"` // Strict indicates if strict JSON parsing is performed
}
// Statistics about JetStream for this server.
@@ -91,11 +91,12 @@ type JetStreamAccountStats struct {
Tiers map[string]JetStreamTier `json:"tiers,omitempty"` // indexed by tier name
}
// JetStreamAPIStats holds stats about the API usage for this server
type JetStreamAPIStats struct {
Level int `json:"level"`
Total uint64 `json:"total"`
Errors uint64 `json:"errors"`
Inflight uint64 `json:"inflight,omitempty"`
Level int `json:"level"` // Level is the active API level this server implements
Total uint64 `json:"total"` // Total is the total API requests received since start
Errors uint64 `json:"errors"` // Errors is the total API requests that resulted in error responses
Inflight uint64 `json:"inflight,omitempty"` // Inflight are the number of API requests currently being served
}
// This is for internal accounting for JetStream for this server.

View File

@@ -1225,6 +1225,8 @@ func (js *jetStream) monitorCluster() {
doSnapshot()
return
case <-rqch:
// Clean signal from shutdown routine so do best effort attempt to snapshot meta layer.
doSnapshot()
return
case <-qch:
// Clean signal from shutdown routine so do best effort attempt to snapshot meta layer.
@@ -1280,10 +1282,10 @@ func (js *jetStream) monitorCluster() {
} else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta {
doSnapshot()
}
ce.ReturnToPool()
} else {
s.Warnf("Error applying JetStream cluster entries: %v", err)
}
ce.ReturnToPool()
}
aq.recycle(&ces)
@@ -2455,6 +2457,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
ne, nb = n.Applied(ce.Index)
ce.ReturnToPool()
} else {
// Make sure to clean up.
ce.ReturnToPool()
// Our stream was closed out from underneath of us, simply return here.
if err == errStreamClosed || err == errCatchupStreamStopped || err == ErrServerNotRunning {
aq.recycle(&ces)
@@ -4862,13 +4866,14 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if n.NeedSnapshot() {
doSnapshot(true)
}
} else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
continue
}
if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
var ne, nb uint64
// We can't guarantee writes are flushed while we're shutting down. Just rely on replay during recovery.
if !js.isShuttingDown() {
ne, nb = n.Applied(ce.Index)
}
ce.ReturnToPool()
// If we have at least min entries to compact, go ahead and snapshot/compact.
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
doSnapshot(false)
@@ -4876,6 +4881,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
} else if err != errConsumerClosed {
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
}
ce.ReturnToPool()
}
aq.recycle(&ces)
@@ -5058,8 +5064,20 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
o.ldt = time.Now()
// Need to send message to the client, since we have quorum to do so now.
if pmsg, ok := o.pendingDeliveries[sseq]; ok {
// Copy delivery subject and sequence first, as the send returns it to the pool and clears it.
dsubj, seq := pmsg.dsubj, pmsg.seq
o.outq.send(pmsg)
delete(o.pendingDeliveries, sseq)
// Might need to send a request timeout after sending the last replicated delivery.
if wd, ok := o.waitingDeliveries[dsubj]; ok && wd.seq == seq {
if wd.pn > 0 || wd.pb > 0 {
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wd.pn, JSPullRequestPendingBytes, wd.pb)
o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
wd.recycle()
delete(o.waitingDeliveries, dsubj)
}
}
o.mu.Unlock()
if err != nil {

View File

@@ -197,6 +197,9 @@ const (
// JSConsumerPushMaxWaitingErr consumer in push mode can not set max waiting
JSConsumerPushMaxWaitingErr ErrorIdentifier = 10080
// JSConsumerPushWithPriorityGroupErr priority groups can not be used with push consumers
JSConsumerPushWithPriorityGroupErr ErrorIdentifier = 10178
// JSConsumerReplacementWithDifferentNameErr consumer replacement durable config not the same
JSConsumerReplacementWithDifferentNameErr ErrorIdentifier = 10106
@@ -570,6 +573,7 @@ var (
JSConsumerPullRequiresAckErr: {Code: 400, ErrCode: 10084, Description: "consumer in pull mode requires explicit ack policy on workqueue stream"},
JSConsumerPullWithRateLimitErr: {Code: 400, ErrCode: 10086, Description: "consumer in pull mode can not have rate limit set"},
JSConsumerPushMaxWaitingErr: {Code: 400, ErrCode: 10080, Description: "consumer in push mode can not set max waiting"},
JSConsumerPushWithPriorityGroupErr: {Code: 400, ErrCode: 10178, Description: "priority groups can not be used with push consumers"},
JSConsumerReplacementWithDifferentNameErr: {Code: 400, ErrCode: 10106, Description: "consumer replacement durable config not the same"},
JSConsumerReplicasExceedsStream: {Code: 400, ErrCode: 10126, Description: "consumer config replica count exceeds parent stream"},
JSConsumerReplicasShouldMatchStream: {Code: 400, ErrCode: 10134, Description: "consumer config replicas must match interest retention stream's replicas"},
@@ -1397,6 +1401,16 @@ func NewJSConsumerPushMaxWaitingError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSConsumerPushMaxWaitingErr]
}
// NewJSConsumerPushWithPriorityGroupError creates a new JSConsumerPushWithPriorityGroupErr error: "priority groups can not be used with push consumers"
func NewJSConsumerPushWithPriorityGroupError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSConsumerPushWithPriorityGroupErr]
}
// NewJSConsumerReplacementWithDifferentNameError creates a new JSConsumerReplacementWithDifferentNameErr error: "consumer replacement durable config not the same"
func NewJSConsumerReplacementWithDifferentNameError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)

View File

@@ -70,11 +70,20 @@ func wipeSlice(buf []byte) {
func validateTrustedOperators(o *Options) error {
if len(o.TrustedOperators) == 0 {
// if we have no operator, default sentinel shouldn't be set
if o.DefaultSentinel != "" {
if o.DefaultSentinel != _EMPTY_ {
return fmt.Errorf("default sentinel requires operators and accounts")
}
return nil
}
if o.DefaultSentinel != _EMPTY_ {
juc, err := jwt.DecodeUserClaims(o.DefaultSentinel)
if err != nil {
return fmt.Errorf("default sentinel JWT not valid")
}
if !juc.BearerToken {
return fmt.Errorf("default sentinel must be a bearer token")
}
}
if o.AccountResolver == nil {
return fmt.Errorf("operators require an account resolver to be configured")
}

View File

@@ -23,6 +23,8 @@ import (
"sync"
"time"
"github.com/nats-io/nats-server/v2/server/ats"
"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nats-server/v2/server/gsl"
"github.com/nats-io/nats-server/v2/server/stree"
@@ -71,6 +73,9 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
}
}
// Register with access time service.
ats.Register()
return ms, nil
}
@@ -86,7 +91,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
ms.cfg = *cfg
// Create or delete the THW if needed.
if cfg.AllowMsgTTL && ms.ttls == nil {
ms.ttls = thw.NewHashWheel()
ms.recoverTTLState()
} else if !cfg.AllowMsgTTL && ms.ttls != nil {
ms.ttls = nil
}
@@ -125,6 +130,30 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
return nil
}
// Lock should be held.
func (ms *memStore) recoverTTLState() {
ms.ttls = thw.NewHashWheel()
if ms.state.Msgs == 0 {
return
}
var (
seq uint64
smv StoreMsg
sm *StoreMsg
)
defer ms.resetAgeChk(0)
for sm, seq, _ = ms.loadNextMsgLocked(fwcs, true, 0, &smv); sm != nil; sm, seq, _ = ms.loadNextMsgLocked(fwcs, true, seq+1, &smv) {
if len(sm.hdr) == 0 {
continue
}
if ttl, _ := getMessageTTL(sm.hdr); ttl > 0 {
expires := time.Duration(sm.ts) + (time.Second * time.Duration(ttl))
ms.ttls.Add(seq, int64(expires))
}
}
}
// Stores a raw message with expected sequence number and timestamp.
// Lock should be held.
func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, ttl int64) error {
@@ -286,7 +315,7 @@ func (ms *memStore) StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, i
// SkipMsg will use the next sequence number but not store anything.
func (ms *memStore) SkipMsg() uint64 {
// Grab time.
now := time.Now().UTC()
now := time.Unix(0, ats.AccessTime()).UTC()
ms.mu.Lock()
seq := ms.state.LastSeq + 1
@@ -294,7 +323,7 @@ func (ms *memStore) SkipMsg() uint64 {
ms.state.LastTime = now
if ms.state.Msgs == 0 {
ms.state.FirstSeq = seq + 1
ms.state.FirstTime = now
ms.state.FirstTime = time.Time{}
} else {
ms.dmap.Insert(seq)
}
@@ -305,7 +334,7 @@ func (ms *memStore) SkipMsg() uint64 {
// Skip multiple msgs.
func (ms *memStore) SkipMsgs(seq uint64, num uint64) error {
// Grab time.
now := time.Now().UTC()
now := time.Unix(0, ats.AccessTime()).UTC()
ms.mu.Lock()
defer ms.mu.Unlock()
@@ -322,7 +351,7 @@ func (ms *memStore) SkipMsgs(seq uint64, num uint64) error {
ms.state.LastSeq = lseq
ms.state.LastTime = now
if ms.state.Msgs == 0 {
ms.state.FirstSeq, ms.state.FirstTime = lseq+1, now
ms.state.FirstSeq, ms.state.FirstTime = lseq+1, time.Time{}
} else {
for ; seq <= lseq; seq++ {
ms.dmap.Insert(seq)
@@ -1075,7 +1104,7 @@ func (ms *memStore) expireMsgs() {
}
if sdmEnabled {
if last, ok := ms.shouldProcessSdm(seq, sm.subj); ok {
sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0
sdm := last && isSubjectDeleteMarker(sm.hdr)
ms.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL)
}
} else {
@@ -1105,7 +1134,7 @@ func (ms *memStore) expireMsgs() {
if ttlSdm == nil {
ttlSdm = make(map[string][]SDMBySubj, 1)
}
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0})
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)})
return false
}
} else {
@@ -1292,7 +1321,9 @@ func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.state.FirstTime = time.Time{}
ms.state.Bytes = 0
ms.state.Msgs = 0
ms.msgs = make(map[uint64]*StoreMsg)
if ms.msgs != nil {
ms.msgs = make(map[uint64]*StoreMsg)
}
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.dmap.Empty()
ms.sdm.empty()
@@ -1421,9 +1452,9 @@ func (ms *memStore) Truncate(seq uint64) error {
ms.mu.Lock()
lsm, ok := ms.msgs[seq]
if !ok {
ms.mu.Unlock()
return ErrInvalidSequence
lastTime := ms.state.LastTime
if ok && lsm != nil {
lastTime = time.Unix(0, lsm.ts).UTC()
}
for i := ms.state.LastSeq; i > seq; i-- {
@@ -1438,8 +1469,8 @@ func (ms *memStore) Truncate(seq uint64) error {
}
}
// Reset last.
ms.state.LastSeq = lsm.seq
ms.state.LastTime = time.Unix(0, lsm.ts).UTC()
ms.state.LastSeq = seq
ms.state.LastTime = lastTime
// Update msgs and bytes.
if purged > ms.state.Msgs {
purged = ms.state.Msgs
@@ -1585,7 +1616,11 @@ func (ms *memStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *S
func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
return ms.loadNextMsgLocked(filter, wc, start, smp)
}
// Lock should be held.
func (ms *memStore) loadNextMsgLocked(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) {
if start < ms.state.FirstSeq {
start = ms.state.FirstSeq
}
@@ -1927,15 +1962,24 @@ func (ms *memStore) Delete() error {
}
func (ms *memStore) Stop() error {
// These can't come back, so stop is same as Delete.
ms.Purge()
ms.mu.Lock()
if ms.msgs == nil {
ms.mu.Unlock()
return nil
}
if ms.ageChk != nil {
ms.ageChk.Stop()
ms.ageChk = nil
}
ms.msgs = nil
ms.mu.Unlock()
// These can't come back, so stop is same as Delete.
ms.Purge()
// Unregister from the access time service.
ats.Unregister()
return nil
}

View File

@@ -1184,184 +1184,185 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
// Varz will output server information on the monitoring port at /varz.
type Varz struct {
ID string `json:"server_id"`
Name string `json:"server_name"`
Version string `json:"version"`
Proto int `json:"proto"`
GitCommit string `json:"git_commit,omitempty"`
GoVersion string `json:"go"`
Host string `json:"host"`
Port int `json:"port"`
AuthRequired bool `json:"auth_required,omitempty"`
TLSRequired bool `json:"tls_required,omitempty"`
TLSVerify bool `json:"tls_verify,omitempty"`
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
IP string `json:"ip,omitempty"`
ClientConnectURLs []string `json:"connect_urls,omitempty"`
WSConnectURLs []string `json:"ws_connect_urls,omitempty"`
MaxConn int `json:"max_connections"`
MaxSubs int `json:"max_subscriptions,omitempty"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPBasePath string `json:"http_base_path"`
HTTPSPort int `json:"https_port"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int32 `json:"max_control_line"`
MaxPayload int `json:"max_payload"`
MaxPending int64 `json:"max_pending"`
Cluster ClusterOptsVarz `json:"cluster,omitempty"`
Gateway GatewayOptsVarz `json:"gateway,omitempty"`
LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"`
MQTT MQTTOptsVarz `json:"mqtt,omitempty"`
Websocket WebsocketOptsVarz `json:"websocket,omitempty"`
JetStream JetStreamVarz `json:"jetstream,omitempty"`
TLSTimeout float64 `json:"tls_timeout"`
WriteDeadline time.Duration `json:"write_deadline"`
Start time.Time `json:"start"`
Now time.Time `json:"now"`
Uptime string `json:"uptime"`
Mem int64 `json:"mem"`
Cores int `json:"cores"`
MaxProcs int `json:"gomaxprocs"`
MemLimit int64 `json:"gomemlimit,omitempty"`
CPU float64 `json:"cpu"`
Connections int `json:"connections"`
TotalConnections uint64 `json:"total_connections"`
Routes int `json:"routes"`
Remotes int `json:"remotes"`
Leafs int `json:"leafnodes"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
SlowConsumers int64 `json:"slow_consumers"`
Subscriptions uint32 `json:"subscriptions"`
HTTPReqStats map[string]uint64 `json:"http_req_stats"`
ConfigLoadTime time.Time `json:"config_load_time"`
ConfigDigest string `json:"config_digest"`
Tags jwt.TagList `json:"tags,omitempty"`
TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"`
TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"`
SystemAccount string `json:"system_account,omitempty"`
PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"`
OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"`
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"`
ID string `json:"server_id"` // ID is the unique server ID generated at start
Name string `json:"server_name"` // Name is the configured server name, equals ID when not set
Version string `json:"version"` // Version is the version of the running server
Proto int `json:"proto"` // Proto is the protocol version this server supports
GitCommit string `json:"git_commit,omitempty"` // GitCommit is the git repository commit hash that the build corresponds with
GoVersion string `json:"go"` // GoVersion is the version of Go used to build this binary
Host string `json:"host"` // Host is the hostname the server runs on
Port int `json:"port"` // Port is the port the server listens on for client connections
AuthRequired bool `json:"auth_required,omitempty"` // AuthRequired indicates if users are required to authenticate to join the server
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if connections must use TLS when connecting to this server
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full TLS verification will be performed
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if the OCSP protocol will be used to verify peers
IP string `json:"ip,omitempty"` // IP is the IP address the server listens on if set
ClientConnectURLs []string `json:"connect_urls,omitempty"` // ClientConnectURLs is the list of URLs NATS clients can use to connect to this server
WSConnectURLs []string `json:"ws_connect_urls,omitempty"` // WSConnectURLs is the list of URLs websocket clients can use to connect to this server
MaxConn int `json:"max_connections"` // MaxConn is the maximum amount of connections the server can accept
MaxSubs int `json:"max_subscriptions,omitempty"` // MaxSubs is the maximum amount of subscriptions the server can manage
PingInterval time.Duration `json:"ping_interval"` // PingInterval is the interval the server will send PING messages during periods of inactivity on a connection
MaxPingsOut int `json:"ping_max"` // MaxPingsOut is the number of unanswered PINGs after which the connection will be considered stale
HTTPHost string `json:"http_host"` // HTTPHost is the HTTP host monitoring connections are accepted on
HTTPPort int `json:"http_port"` // HTTPPort is the port monitoring connections are accepted on
HTTPBasePath string `json:"http_base_path"` // HTTPBasePath is the path prefix for access to monitor endpoints
HTTPSPort int `json:"https_port"` // HTTPSPort is the HTTPS host monitoring connections are accepted on`
AuthTimeout float64 `json:"auth_timeout"` // AuthTimeout is the amount of seconds connections have to complete authentication
MaxControlLine int32 `json:"max_control_line"` // MaxControlLine is the amount of bytes a signal control message may be
MaxPayload int `json:"max_payload"` // MaxPayload is the maximum amount of bytes a message may have as payload
MaxPending int64 `json:"max_pending"` // MaxPending is the maximum amount of unprocessed bytes a connection may have
Cluster ClusterOptsVarz `json:"cluster,omitempty"` // Cluster is the Cluster state
Gateway GatewayOptsVarz `json:"gateway,omitempty"` // Gateway is the Super Cluster state
LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"` // LeafNode is the Leafnode state
MQTT MQTTOptsVarz `json:"mqtt,omitempty"` // MQTT is the MQTT state
Websocket WebsocketOptsVarz `json:"websocket,omitempty"` // Websocket is the Websocket client state
JetStream JetStreamVarz `json:"jetstream,omitempty"` // JetStream is the JetStream state
TLSTimeout float64 `json:"tls_timeout"` // TLSTimeout is how long TLS operations have to complete
WriteDeadline time.Duration `json:"write_deadline"` // WriteDeadline is the maximum time writes to sockets have to complete
Start time.Time `json:"start"` // Start is time when the server was started
Now time.Time `json:"now"` // Now is the current time of the server
Uptime string `json:"uptime"` // Uptime is how long the server has been running
Mem int64 `json:"mem"` // Mem is the resident memory allocation
Cores int `json:"cores"` // Cores is the number of cores the process has access to
MaxProcs int `json:"gomaxprocs"` // MaxProcs is the configured GOMAXPROCS value
MemLimit int64 `json:"gomemlimit,omitempty"` // MemLimit is the configured GOMEMLIMIT value
CPU float64 `json:"cpu"` // CPU is the current total CPU usage
Connections int `json:"connections"` // Connections is the current connected connections
TotalConnections uint64 `json:"total_connections"` // TotalConnections is the total connections the server have ever handled
Routes int `json:"routes"` // Routes is the number of connected route servers
Remotes int `json:"remotes"` // Remotes is the configured route remote endpoints
Leafs int `json:"leafnodes"` // Leafs is the number connected leafnode clients
InMsgs int64 `json:"in_msgs"` // InMsgs is the number of messages this server received
OutMsgs int64 `json:"out_msgs"` // OutMsgs is the number of message this server sent
InBytes int64 `json:"in_bytes"` // InBytes is the number of bytes this server received
OutBytes int64 `json:"out_bytes"` // OutMsgs is the number of bytes this server sent
SlowConsumers int64 `json:"slow_consumers"` // SlowConsumers is the total count of clients that were disconnected since start due to being slow consumers
Subscriptions uint32 `json:"subscriptions"` // Subscriptions is the count of active subscriptions
HTTPReqStats map[string]uint64 `json:"http_req_stats"` // HTTPReqStats is the number of requests each HTTP endpoint received
ConfigLoadTime time.Time `json:"config_load_time"` // ConfigLoadTime is the time the configuration was loaded or reloaded
ConfigDigest string `json:"config_digest"` // ConfigDigest is a calculated hash of the current configuration
Tags jwt.TagList `json:"tags,omitempty"` // Tags are the tags assigned to the server in configuration
Metadata map[string]string `json:"metadata,omitempty"` // Metadata is the metadata assigned to the server in configuration
TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"` // TrustedOperatorsJwt is the JWTs for all trusted operators
TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"` // TrustedOperatorsClaim is the decoded claims for each trusted operator
SystemAccount string `json:"system_account,omitempty"` // SystemAccount is the name of the System account
PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` // PinnedAccountFail is how often user logon fails due to the issuer account not being pinned.
OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` // OCSPResponseCache is the state of the OCSP cache // OCSPResponseCache holds information about
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` // SlowConsumersStats is statistics about all detected Slow Consumer
}
// JetStreamVarz contains basic runtime information about jetstream
type JetStreamVarz struct {
Config *JetStreamConfig `json:"config,omitempty"`
Stats *JetStreamStats `json:"stats,omitempty"`
Meta *MetaClusterInfo `json:"meta,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
Config *JetStreamConfig `json:"config,omitempty"` // Config is the active JetStream configuration
Stats *JetStreamStats `json:"stats,omitempty"` // Stats is the statistics for the JetStream server
Meta *MetaClusterInfo `json:"meta,omitempty"` // Meta is information about the JetStream metalayer
Limits *JSLimitOpts `json:"limits,omitempty"` // Limits are the configured JetStream limits
}
// ClusterOptsVarz contains monitoring cluster information
type ClusterOptsVarz struct {
Name string `json:"name,omitempty"`
Host string `json:"addr,omitempty"`
Port int `json:"cluster_port,omitempty"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
URLs []string `json:"urls,omitempty"`
TLSTimeout float64 `json:"tls_timeout,omitempty"`
TLSRequired bool `json:"tls_required,omitempty"`
TLSVerify bool `json:"tls_verify,omitempty"`
PoolSize int `json:"pool_size,omitempty"`
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections
Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size
}
// GatewayOptsVarz contains monitoring gateway information
type GatewayOptsVarz struct {
Name string `json:"name,omitempty"`
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
TLSTimeout float64 `json:"tls_timeout,omitempty"`
TLSRequired bool `json:"tls_required,omitempty"`
TLSVerify bool `json:"tls_verify,omitempty"`
Advertise string `json:"advertise,omitempty"`
ConnectRetries int `json:"connect_retries,omitempty"`
Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"`
RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections
Port int `json:"port,omitempty"` // Port is the post gateway connections listens on
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients
ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make
Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes
RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected
}
// RemoteGatewayOptsVarz contains monitoring remote gateway information
type RemoteGatewayOptsVarz struct {
Name string `json:"name"`
TLSTimeout float64 `json:"tls_timeout,omitempty"`
URLs []string `json:"urls,omitempty"`
Name string `json:"name"` // Name is the name of the remote gateway
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
URLs []string `json:"urls,omitempty"` // URLs is the list of Gateway URLs
}
// LeafNodeOptsVarz contains monitoring leaf node information
type LeafNodeOptsVarz struct {
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
TLSTimeout float64 `json:"tls_timeout,omitempty"`
TLSRequired bool `json:"tls_required,omitempty"`
TLSVerify bool `json:"tls_verify,omitempty"`
Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"`
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
Host string `json:"host,omitempty"` // Host is the host the server listens on
Port int `json:"port,omitempty"` // Port is the port the server listens on
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time Leafnode connections have to complete authentication
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"` // Remotes is state of configured Leafnode remotes
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed
}
// DenyRules Contains lists of subjects not allowed to be imported/exported
type DenyRules struct {
Exports []string `json:"exports,omitempty"`
Imports []string `json:"imports,omitempty"`
Exports []string `json:"exports,omitempty"` // Exports are denied exports
Imports []string `json:"imports,omitempty"` // Imports are denied imports
}
// RemoteLeafOptsVarz contains monitoring remote leaf node information
type RemoteLeafOptsVarz struct {
LocalAccount string `json:"local_account,omitempty"`
TLSTimeout float64 `json:"tls_timeout,omitempty"`
URLs []string `json:"urls,omitempty"`
Deny *DenyRules `json:"deny,omitempty"`
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
LocalAccount string `json:"local_account,omitempty"` // LocalAccount is the local account this leaf is logged into
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
URLs []string `json:"urls,omitempty"` // URLs is the list of URLs for the remote Leafnode connection
Deny *DenyRules `json:"deny,omitempty"` // Deny is the configured import and exports that the Leafnode may not access
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
}
// MQTTOptsVarz contains monitoring MQTT information
type MQTTOptsVarz struct {
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
NoAuthUser string `json:"no_auth_user,omitempty"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
TLSMap bool `json:"tls_map,omitempty"`
TLSTimeout float64 `json:"tls_timeout,omitempty"`
TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"`
JsDomain string `json:"js_domain,omitempty"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxAckPending uint16 `json:"max_ack_pending,omitempty"`
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
Host string `json:"host,omitempty"` // Host is the host the server listens on
Port int `json:"port,omitempty"` // Port is the port the server listens on
NoAuthUser string `json:"no_auth_user,omitempty"` // NoAuthUser is the user that will be used for unauthenticated connections
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is how long authentication has to complete
TLSMap bool `json:"tls_map,omitempty"` // TLSMap indicates if TLS Mapping is enabled
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` // TLSPinnedCerts is the list of certificates pinned to this connection
JsDomain string `json:"js_domain,omitempty"` // JsDomain is the JetStream domain used for MQTT state
AckWait time.Duration `json:"ack_wait,omitempty"` // AckWait is how long the internal JetStream state store will allow acks to complete
MaxAckPending uint16 `json:"max_ack_pending,omitempty"` // MaxAckPending is how many outstanding acks the internal JetStream state store will allow
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
}
// WebsocketOptsVarz contains monitoring websocket information
type WebsocketOptsVarz struct {
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
Advertise string `json:"advertise,omitempty"`
NoAuthUser string `json:"no_auth_user,omitempty"`
JWTCookie string `json:"jwt_cookie,omitempty"`
HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
NoTLS bool `json:"no_tls,omitempty"`
TLSMap bool `json:"tls_map,omitempty"`
TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"`
SameOrigin bool `json:"same_origin,omitempty"`
AllowedOrigins []string `json:"allowed_origins,omitempty"`
Compression bool `json:"compression,omitempty"`
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
Host string `json:"host,omitempty"` // Host is the host the server listens on
Port int `json:"port,omitempty"` // Port is the port the server listens on
Advertise string `json:"advertise,omitempty"` // Advertise is the connection URL the server advertises
NoAuthUser string `json:"no_auth_user,omitempty"` // NoAuthUser is the user that will be used for unauthenticated connections
JWTCookie string `json:"jwt_cookie,omitempty"` // JWTCookie is the name of a cookie the server will read for the connection JWT
HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"` // HandshakeTimeout is how long the connection has to complete the websocket setup
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is how long authentication has to complete
NoTLS bool `json:"no_tls,omitempty"` // NoTLS indicates if TLS is disabled
TLSMap bool `json:"tls_map,omitempty"` // TLSMap indicates if TLS Mapping is enabled
TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` // TLSPinnedCerts is the list of certificates pinned to this connection
SameOrigin bool `json:"same_origin,omitempty"` // SameOrigin indicates if same origin connections are allowed
AllowedOrigins []string `json:"allowed_origins,omitempty"` // AllowedOrigins list of configured trusted origins
Compression bool `json:"compression,omitempty"` // Compression indicates if compression is supported
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
}
// OCSPResponseCacheVarz contains OCSP response cache information
type OCSPResponseCacheVarz struct {
Type string `json:"cache_type,omitempty"`
Hits int64 `json:"cache_hits,omitempty"`
Misses int64 `json:"cache_misses,omitempty"`
Responses int64 `json:"cached_responses,omitempty"`
Revokes int64 `json:"cached_revoked_responses,omitempty"`
Goods int64 `json:"cached_good_responses,omitempty"`
Unknowns int64 `json:"cached_unknown_responses,omitempty"`
Type string `json:"cache_type,omitempty"` // Type is the kind of cache being used
Hits int64 `json:"cache_hits,omitempty"` // Hits is how many times the cache was able to answer a request
Misses int64 `json:"cache_misses,omitempty"` // Misses is how many times the cache failed to answer a request
Responses int64 `json:"cached_responses,omitempty"` // Responses is how many responses are currently stored in the cache
Revokes int64 `json:"cached_revoked_responses,omitempty"` // Revokes is how many of the stored cache entries are revokes
Goods int64 `json:"cached_good_responses,omitempty"` // Goods is how many of the stored cache entries are good responses
Unknowns int64 `json:"cached_unknown_responses,omitempty"` // Unknowns is how many of the stored cache entries are unknown responses
}
// VarzOptions are the options passed to Varz().
@@ -1370,10 +1371,10 @@ type VarzOptions struct{}
// SlowConsumersStats contains information about the slow consumers from different type of connections.
type SlowConsumersStats struct {
Clients uint64 `json:"clients"`
Routes uint64 `json:"routes"`
Gateways uint64 `json:"gateways"`
Leafs uint64 `json:"leafs"`
Clients uint64 `json:"clients"` // Clients is how many Clients were slow consumers
Routes uint64 `json:"routes"` // Routes is how many Routes were slow consumers
Gateways uint64 `json:"gateways"` // Gateways is how many Gateways were slow consumers
Leafs uint64 `json:"leafs"` // Leafs is how many Leafnodes were slow consumers
}
func myUptime(d time.Duration) string {
@@ -1431,6 +1432,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
a.last { padding-bottom: 16px }
a.version { font-size: 14; font-weight: 400; width: 312px; text-align: right; margin-top: -2rem }
a.version:hover { color: rgb(22 22 32) }
.endpoint { font-size: 12px; color: #999; font-family: monospace; display: none }
a:hover .endpoint { display: inline }
</style>
</head>
@@ -1441,33 +1444,33 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
</div>
<br/>
<a href=.%s>General</a>
<a href=.%s>JetStream</a>
<a href=.%s>Connections</a>
<a href=.%s>Accounts</a>
<a href=.%s>Account Stats</a>
<a href=.%s>Subscriptions</a>
<a href=.%s>Routes</a>
<a href=.%s>LeafNodes</a>
<a href=.%s>Gateways</a>
<a href=.%s>Raft Groups</a>
<a href=.%s class=last>Health Probe</a>
<a href=.%s>General<span class="endpoint"> %s</span></a>
<a href=.%s>JetStream<span class="endpoint"> %s</span></a>
<a href=.%s>Connections<span class="endpoint"> %s</span></a>
<a href=.%s>Accounts<span class="endpoint"> %s</span></a>
<a href=.%s>Account Stats<span class="endpoint"> %s</span></a>
<a href=.%s>Subscriptions<span class="endpoint"> %s</span></a>
<a href=.%s>Routes<span class="endpoint"> %s</span></a>
<a href=.%s>LeafNodes<span class="endpoint"> %s</span></a>
<a href=.%s>Gateways<span class="endpoint"> %s</span></a>
<a href=.%s>Raft Groups<span class="endpoint"> %s</span></a>
<a href=.%s class=last>Health Probe<span class="endpoint"> %s</span></a>
<a href=https://docs.nats.io/running-a-nats-service/nats_admin/monitoring class="help">Help</a>
</body>
</html>`,
srcUrl,
VERSION,
s.basePath(VarzPath),
s.basePath(JszPath),
s.basePath(ConnzPath),
s.basePath(AccountzPath),
s.basePath(AccountStatzPath),
s.basePath(SubszPath),
s.basePath(RoutezPath),
s.basePath(LeafzPath),
s.basePath(GatewayzPath),
s.basePath(RaftzPath),
s.basePath(HealthzPath),
s.basePath(VarzPath), VarzPath,
s.basePath(JszPath), JszPath,
s.basePath(ConnzPath), ConnzPath,
s.basePath(AccountzPath), AccountzPath,
s.basePath(AccountStatzPath), AccountStatzPath,
s.basePath(SubszPath), SubszPath,
s.basePath(RoutezPath), RoutezPath,
s.basePath(LeafzPath), LeafzPath,
s.basePath(GatewayzPath), GatewayzPath,
s.basePath(RaftzPath), RaftzPath,
s.basePath(HealthzPath), HealthzPath,
)
}
@@ -2245,6 +2248,7 @@ type LeafzOptions struct {
// LeafInfo has detailed information on each remote leafnode connection.
type LeafInfo struct {
ID uint64 `json:"id"`
Name string `json:"name"`
IsSpoke bool `json:"is_spoke"`
Account string `json:"account"`
@@ -2287,6 +2291,7 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) {
for _, ln := range lconns {
ln.mu.Lock()
lni := &LeafInfo{
ID: ln.cid,
Name: ln.leaf.remoteServer,
IsSpoke: ln.isSpokeLeafNode(),
Account: ln.acc.Name,
@@ -2873,12 +2878,12 @@ type AccountDetail struct {
// MetaClusterInfo shows information about the meta group.
type MetaClusterInfo struct {
Name string `json:"name,omitempty"`
Leader string `json:"leader,omitempty"`
Peer string `json:"peer,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
Size int `json:"cluster_size"`
Pending int `json:"pending"`
Name string `json:"name,omitempty"` // Name is the name of the cluster
Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader
Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader
Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers
Size int `json:"cluster_size"` // Size is the known size of the cluster
Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed
}
// JSInfo has detailed information on JetStream.

View File

@@ -247,11 +247,12 @@ type RemoteLeafOpts struct {
JetStreamClusterMigrateDelay time.Duration `json:"jetstream_cluster_migrate_delay,omitempty"`
}
// JSLimitOpts are active limits for the meta cluster
type JSLimitOpts struct {
MaxRequestBatch int `json:"max_request_batch,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
MaxHAAssets int `json:"max_ha_assets,omitempty"`
Duplicates time.Duration `json:"max_duplicate_window,omitempty"`
MaxRequestBatch int `json:"max_request_batch,omitempty"` // MaxRequestBatch is the maximum amount of updates that can be sent in a batch
MaxAckPending int `json:"max_ack_pending,omitempty"` // MaxAckPending is the server limit for maximum amount of outstanding Acks
MaxHAAssets int `json:"max_ha_assets,omitempty"` // MaxHAAssets is the maximum of Streams and Consumers that may have more than 1 replica
Duplicates time.Duration `json:"max_duplicate_window,omitempty"` // Duplicates is the maximum value for duplicate tracking on Streams
}
type JSTpmOpts struct {

View File

@@ -142,7 +142,7 @@ type raft struct {
wal WAL // WAL store (filestore or memstore)
wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage
track bool //
bytes uint64 // Total amount of bytes stored in the WAL. (Saves us from needing to call wal.FastState very often)
werr error // Last write error
state atomic.Int32 // RaftState
@@ -164,34 +164,31 @@ type raft struct {
llqrt time.Time // Last quorum lost time
lsut time.Time // Last scale-up time
term uint64 // The current vote term
pterm uint64 // Previous term from the last snapshot
pindex uint64 // Previous index from the last snapshot
commit uint64 // Index of the most recent commit
applied uint64 // Index of the most recently applied commit
term uint64 // The current vote term
pterm uint64 // Previous term from the last snapshot
pindex uint64 // Previous index from the last snapshot
commit uint64 // Index of the most recent commit
applied uint64 // Index of the most recently applied commit
papplied uint64 // First sequence of our log, matches when we last installed a snapshot.
aflr uint64 // Index when to signal initial messages have been applied after becoming leader. 0 means signaling is disabled.
leader string // The ID of the leader
vote string // Our current vote state
lxfer bool // Are we doing a leadership transfer?
hcbehind bool // Were we falling behind at the last health check? (see: isCurrent)
s *Server // Reference to top-level server
c *client // Internal client for subscriptions
js *jetStream // JetStream, if running, to see if we are out of resources
dflag bool // Debug flag
hasleader atomic.Bool // Is there a group leader right now?
pleader atomic.Bool // Has the group ever had a leader?
isSysAcc atomic.Bool // Are we utilizing the system account?
maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up.
observer bool // The node is observing, i.e. not able to become leader
hasleader atomic.Bool // Is there a group leader right now?
pleader atomic.Bool // Has the group ever had a leader?
isSysAcc atomic.Bool // Are we utilizing the system account?
extSt extensionState // Extension state
track bool // Whether out of resources checking is enabled.
dflag bool // Debug flag
psubj string // Proposals subject
rpsubj string // Remove peers subject
vsubj string // Vote requests subject
@@ -208,9 +205,7 @@ type raft struct {
catchup *catchupState // For when we need to catch up as a follower.
progress map[string]*ipQueue[uint64] // For leader or server catching up a follower.
paused bool // Whether or not applies are paused
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused
hcommit uint64 // The commit at the time that applies were paused
prop *ipQueue[*proposedEntry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
@@ -220,6 +215,13 @@ type raft struct {
votes *ipQueue[*voteResponse] // Vote responses
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
lxfer bool // Are we doing a leadership transfer?
hcbehind bool // Were we falling behind at the last health check? (see: isCurrent)
maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up.
paused bool // Whether or not applies are paused
observer bool // The node is observing, i.e. not able to become leader
pobserver bool // Were we previously an observer?
}
type proposedEntry struct {
@@ -439,6 +441,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
// Can't recover snapshots if memory based since wal will be reset.
// We will inherit from the current leader.
n.papplied = 0
if _, ok := n.wal.(*memStore); ok {
_ = os.RemoveAll(filepath.Join(n.sd, snapshotsDir))
} else {
@@ -462,6 +465,8 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
// we will try to replay them and process them here.
var state StreamState
n.wal.FastState(&state)
n.bytes = state.Bytes
if state.Msgs > 0 {
n.debug("Replaying state of %d entries", state.Msgs)
if first, err := n.loadFirstEntry(); err == nil {
@@ -1092,13 +1097,11 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
// Calculate the number of entries and estimate the byte size that
// we can now remove with a compaction/snapshot.
var state StreamState
n.wal.FastState(&state)
if n.applied > state.FirstSeq {
entries = n.applied - state.FirstSeq
if n.applied > n.papplied {
entries = n.applied - n.papplied
}
if state.Msgs > 0 {
bytes = entries * state.Bytes / state.Msgs
if n.bytes > 0 {
bytes = entries * n.bytes / (n.pindex - n.papplied)
}
return entries, bytes
}
@@ -1184,7 +1187,7 @@ func (n *raft) InstallSnapshot(data []byte) error {
return errNoSnapAvailable
}
n.debug("Installing snapshot of %d bytes", len(data))
n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied)
return n.installSnapshot(&snapshot{
lastTerm: term,
@@ -1218,6 +1221,10 @@ func (n *raft) installSnapshot(snap *snapshot) error {
return err
}
var state StreamState
n.wal.FastState(&state)
n.papplied = snap.lastIndex
n.bytes = state.Bytes
return nil
}
@@ -1314,8 +1321,10 @@ func (n *raft) setupLastSnapshot() {
// Compact the WAL when we're done if needed.
n.pindex = snap.lastIndex
n.pterm = snap.lastTerm
// Explicitly only set commit, and not applied.
// Applied will move up when the snapshot is actually applied.
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.papplied = snap.lastIndex
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
@@ -1696,12 +1705,12 @@ func (n *raft) Progress() (index, commit, applied uint64) {
}
// Size returns number of entries and total bytes for our WAL.
func (n *raft) Size() (uint64, uint64) {
func (n *raft) Size() (entries uint64, bytes uint64) {
n.RLock()
var state StreamState
n.wal.FastState(&state)
entries = n.pindex - n.papplied
bytes = n.bytes
n.RUnlock()
return state.Msgs, state.Bytes
return entries, bytes
}
func (n *raft) ID() string {
@@ -1957,7 +1966,7 @@ func (n *raft) run() {
n.apply.push(nil)
runner:
for s.isRunning() {
for {
switch n.State() {
case Follower:
n.runAsFollower()
@@ -2691,7 +2700,7 @@ func (n *raft) loadFirstEntry() (ae *appendEntry, err error) {
func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64]) {
n.RLock()
s, reply := n.s, n.areply
peer, subj, term, last := ar.peer, ar.reply, n.term, n.pindex
peer, subj, term, pterm, last := ar.peer, ar.reply, n.term, n.pterm, n.pindex
n.RUnlock()
defer s.grWG.Done()
@@ -2713,7 +2722,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64
indexUpdatesQ.unregister()
}()
n.debug("Running catchup for %q", peer)
n.debug("Running catchup for %q [%d:%d] to [%d:%d]", peer, ar.term, ar.index, pterm, last)
const maxOutstanding = 2 * 1024 * 1024 // 2MB for now.
next, total, om := uint64(0), 0, make(map[uint64]int)
@@ -2919,9 +2928,7 @@ func (n *raft) applyCommit(index uint64) error {
ae := n.pae[index]
if ae == nil {
var state StreamState
n.wal.FastState(&state)
if index < state.FirstSeq {
if index < n.papplied {
return nil
}
var err error
@@ -3306,24 +3313,15 @@ func (n *raft) truncateWAL(term, index uint64) {
if n.applied > n.commit {
n.applied = n.commit
}
if n.papplied > n.applied {
n.papplied = n.applied
}
}()
if err := n.wal.Truncate(index); err != nil {
// If we get an invalid sequence, reset our wal all together.
// We will not have holes, so this means we do not have this message stored anymore.
// This is normal when truncating back to applied/snapshot.
if err == ErrInvalidSequence {
n.debug("Clearing WAL")
n.wal.Truncate(0)
// If our index is non-zero use PurgeEx to set us to the correct next index.
if index > 0 {
n.wal.PurgeEx(fwcs, index+1, 0)
}
} else {
n.warn("Error truncating WAL: %v", err)
n.setWriteErrLocked(err)
return
}
n.warn("Error truncating WAL: %v", err)
n.setWriteErrLocked(err)
return
}
// Set after we know we have truncated properly.
n.pterm, n.pindex = term, index
@@ -3515,29 +3513,39 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if ae.pterm != n.pterm || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
n.debug("AppendEntry detected pindex less than/equal to ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
n.debug("AppendEntry detected pindex less than/equal to ours: [%d:%d] vs [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse
var success bool
if ae.pindex < n.commit {
// If we have already committed this entry, just mark success.
success = true
n.debug("AppendEntry pindex %d below commit %d, marking success", ae.pindex, n.commit)
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
success = true
n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex, n.commit)
} else if ae.pindex == n.pindex {
// Check if only our terms do not match here.
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
} else if ae.pindex == n.applied {
// Entry can't be found, this is normal because we have a snapshot at this index.
// Truncate back to where we've created the snapshot.
n.truncateWAL(ae.pterm, ae.pindex)
} else {
n.resetWAL()
snap, err := n.loadLastSnapshot()
if err == nil && snap.lastIndex == ae.pindex && snap.lastTerm == ae.pterm {
// Entry can't be found, this is normal because we have a snapshot at this index.
// Truncate back to where we've created the snapshot.
n.truncateWAL(snap.lastTerm, snap.lastIndex)
// Only continue if truncation was successful, and we ended up such that we can safely continue.
if ae.pterm == n.pterm && ae.pindex == n.pindex {
goto CONTINUE
}
} else {
// Otherwise, something has gone very wrong and we need to reset.
n.resetWAL()
}
}
} else if eae.term == ae.pterm {
// If terms match we can delete all entries past this one, and then continue storing the current entry.
@@ -3594,12 +3602,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.pterm = ae.pterm
n.commit = ae.pindex
if _, err := n.wal.Compact(n.pindex + 1); err != nil {
n.setWriteErrLocked(err)
n.Unlock()
return
}
snap := &snapshot{
lastTerm: n.pterm,
lastIndex: n.pindex,
@@ -3620,7 +3622,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
// Setup our state for catching up.
n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex)
n.debug("AppendEntry did not match [%d:%d] with [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex)
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
n.Unlock()
@@ -3836,6 +3838,13 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
return errEntryStoreFailed
}
var sz uint64
if n.wtype == FileStorage {
sz = fileStoreMsgSize(_EMPTY_, nil, ae.buf)
} else {
sz = memStoreMsgSize(_EMPTY_, nil, ae.buf)
}
n.bytes += sz
n.pterm = ae.term
n.pindex = seq
return nil
@@ -4083,7 +4092,6 @@ func (n *raft) setWriteErrLocked(err error) {
// Ignore non-write errors.
if err == ErrStoreClosed ||
err == ErrStoreEOF ||
err == ErrInvalidSequence ||
err == ErrStoreMsgNotFound ||
err == errNoPending ||
err == errPartialCache {
@@ -4428,11 +4436,8 @@ func (n *raft) switchToLeader() {
n.debug("Switching to leader")
var state StreamState
n.wal.FastState(&state)
// Check if we have items pending as we are taking over.
sendHB := state.LastSeq > n.commit
sendHB := n.pindex > n.commit
n.lxfer = false
n.updateLeader(n.id)

View File

@@ -13,7 +13,10 @@
package server
import "time"
import (
"bytes"
"time"
)
// SDMMeta holds pending/proposed data for subject delete markers or message removals.
type SDMMeta struct {
@@ -40,6 +43,12 @@ func newSDMMeta() *SDMMeta {
}
}
// isSubjectDeleteMarker returns whether the headers indicate this message is a subject delete marker.
// Either it's a usual marker with JSMarkerReason, or it's a KV Purge marker as the KVOperation.
func isSubjectDeleteMarker(hdr []byte) bool {
return len(sliceHeader(JSMarkerReason, hdr)) == 0 && !bytes.Equal(sliceHeader(KVOperation, hdr), KVOperationValuePurge)
}
// empty clears all data.
func (sdm *SDMMeta) empty() {
if sdm == nil {

View File

@@ -2704,7 +2704,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
// Setup state that can enable shutdown
s.mu.Lock()
hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
l, e := natsListen("tcp", hp)
l, e := s.getServerListener(hp)
s.listenerErr = e
if e != nil {
s.mu.Unlock()
@@ -2760,6 +2760,18 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
clr = nil
}
// getServerListener returns a network listener for the given host-port address.
// If the Server already has an active listener (s.listener), it returns that listener
// along with any previous error (s.listenerErr). Otherwise, it creates and returns
// a new TCP listener on the specified address using natsListen.
func (s *Server) getServerListener(hp string) (net.Listener, error) {
if s.listener != nil {
return s.listener, s.listenerErr
}
return natsListen("tcp", hp)
}
// InProcessConn returns an in-process connection to the server,
// avoiding the need to use a TCP listener for local connectivity
// within the same process. This can be used regardless of the

View File

@@ -61,8 +61,6 @@ var (
ErrStoreWrongType = errors.New("wrong storage type")
// ErrNoAckPolicy is returned when trying to update a consumer's acks with no ack policy.
ErrNoAckPolicy = errors.New("ack policy is none")
// ErrInvalidSequence is returned when the sequence is not present in the stream store.
ErrInvalidSequence = errors.New("invalid sequence")
// ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong.
ErrSequenceMismatch = errors.New("expected sequence does not match store")
// ErrCorruptStreamState

View File

@@ -223,12 +223,12 @@ type ClusterInfo struct {
// PeerInfo shows information about all the peers in the cluster that
// are supporting the stream or consumer.
type PeerInfo struct {
Name string `json:"name"`
Current bool `json:"current"`
Offline bool `json:"offline,omitempty"`
Active time.Duration `json:"active"`
Lag uint64 `json:"lag,omitempty"`
Peer string `json:"peer"`
Name string `json:"name"` // Name is the unique name for the peer
Current bool `json:"current"` // Current indicates if it was seen recently and fully caught up
Offline bool `json:"offline,omitempty"` // Offline indicates if it has not been seen recently
Active time.Duration `json:"active"` // Active is the timestamp it was last active
Lag uint64 `json:"lag,omitempty"` // Lag is how many operations behind it is
Peer string `json:"peer"` // Peer is the unique ID for the peer
// For migrations.
cluster string
}
@@ -432,6 +432,12 @@ const (
JSMarkerReason = "Nats-Marker-Reason"
)
// Headers for published KV messages.
var (
KVOperation = "KV-Operation"
KVOperationValuePurge = []byte("PURGE")
)
// Headers for republished messages and direct gets.
const (
JSStream = "Nats-Stream"
@@ -1873,7 +1879,7 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server, pedan
// Check limits. We need some extra handling to allow updating MaxBytes.
// First, let's calculate the difference between the new and old MaxBytes.
maxBytesDiff := cfg.MaxBytes - old.MaxBytes
maxBytesDiff := max(cfg.MaxBytes, 0) - max(old.MaxBytes, 0)
if maxBytesDiff < 0 {
// If we're updating to a lower MaxBytes (maxBytesDiff is negative),
// then set to zero so checkBytesLimits doesn't set addBytes to 1.
@@ -2205,7 +2211,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool,
mset.mu.Unlock()
if js != nil {
maxBytesDiff := cfg.MaxBytes - ocfg.MaxBytes
maxBytesDiff := max(cfg.MaxBytes, 0) - max(ocfg.MaxBytes, 0)
if maxBytesDiff > 0 {
// Reserve the difference
js.reserveStreamResources(&StreamConfig{

View File

@@ -1433,6 +1433,12 @@ func tokenizeSubjectIntoSlice(tts []string, subject string) []string {
return tts
}
// SubjectMatchesFilter returns true if the subject matches the provided
// filter or false otherwise.
func SubjectMatchesFilter(subject, filter string) bool {
return subjectIsSubsetMatch(subject, filter)
}
// Calls into the function isSubsetMatch()
func subjectIsSubsetMatch(subject, test string) bool {
tsa := [32]string{}

View File

@@ -152,51 +152,49 @@ func (hw *HashWheel) Update(seq uint64, oldExpires int64, newExpires int64) erro
// ExpireTasks processes all expired tasks using a callback, but only expires a task if the callback returns true.
func (hw *HashWheel) ExpireTasks(callback func(seq uint64, expires int64) bool) {
now := time.Now().UnixNano()
hw.expireTasks(now, callback)
}
func (hw *HashWheel) expireTasks(ts int64, callback func(seq uint64, expires int64) bool) {
// Quick return if nothing is expired.
if hw.lowest > now {
if hw.lowest > ts {
return
}
// Start from the slot containing the lowest expiration.
startPos, exitPos := hw.getPosition(hw.lowest), hw.getPosition(now+tickDuration)
var updateLowest bool
for offset := int64(0); ; offset++ {
pos := (startPos + offset) & wheelMask
if pos == exitPos {
if updateLowest {
hw.updateLowestExpires()
globalLowest := int64(math.MaxInt64)
for pos, s := range hw.wheel {
// Skip s if nothing to expire.
if s == nil || s.lowest > ts {
if s != nil && s.lowest < globalLowest {
globalLowest = s.lowest
}
return
}
// Grab our slot.
slot := hw.wheel[pos]
if slot == nil || slot.lowest > now {
continue
}
// Track new lowest while processing expirations
newLowest := int64(math.MaxInt64)
for seq, expires := range slot.entries {
if expires <= now && callback(seq, expires) {
delete(slot.entries, seq)
slotLowest := int64(math.MaxInt64)
for seq, expires := range s.entries {
if expires <= ts && callback(seq, expires) {
delete(s.entries, seq)
hw.count--
updateLowest = true
continue
}
if expires < newLowest {
newLowest = expires
if expires < slotLowest {
slotLowest = expires
}
}
// Nil out if we are empty.
if len(slot.entries) == 0 {
if len(s.entries) == 0 {
hw.wheel[pos] = nil
} else {
slot.lowest = newLowest
s.lowest = slotLowest
if slotLowest < globalLowest {
globalLowest = slotLowest
}
}
}
hw.lowest = globalLowest
}
// GetNextExpiration returns the earliest expiration time before the given time.

2
vendor/modules.txt vendored
View File

@@ -996,7 +996,7 @@ github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.7.4
## explicit; go 1.23.0
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.11.6
# github.com/nats-io/nats-server/v2 v2.11.7
## explicit; go 1.23.0
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand