Bump github.com/nats-io/nats-server/v2 from 2.10.4 to 2.10.5

Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.10.4 to 2.10.5.
- [Release notes](https://github.com/nats-io/nats-server/releases)
- [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml)
- [Commits](https://github.com/nats-io/nats-server/compare/v2.10.4...v2.10.5)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-11-27 08:19:15 +00:00
committed by Ralf Haferkamp
parent c017037c71
commit 1ca166ff48
18 changed files with 1066 additions and 665 deletions

6
go.mod
View File

@@ -60,7 +60,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.2.1
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats-server/v2 v2.10.5
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.5
@@ -272,7 +272,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/nats.go v1.31.0 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
@@ -331,7 +331,7 @@ require (
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/time v0.4.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.8 // indirect

11
go.sum
View File

@@ -1739,10 +1739,10 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.4 h1:uB9xcwon3tPXWAdmTJqqqC6cie3yuPWHJjjTBgaPNus=
github.com/nats-io/nats-server/v2 v2.10.4/go.mod h1:eWm2JmHP9Lqm2oemB6/XGi0/GwsZwtWf8HIPUsh+9ns=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.5 h1:hhWt6m9ja/mNnm6ixc85jCthDaiUFPaeJI79K/MD980=
github.com/nats-io/nats-server/v2 v2.10.5/go.mod h1:xUMTU4kS//SDkJCSvFwN9SyJ9nUuLhSkzB/Qz0dvjjg=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
@@ -2548,8 +2548,9 @@ golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY=
golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -136,8 +136,12 @@ func ValidateOperatorServiceURL(v string) error {
return nil
case "tls":
return nil
case "ws":
return nil
case "wss":
return nil
default:
return fmt.Errorf("operator service url %q - protocol not supported (only 'nats' or 'tls' only)", v)
return fmt.Errorf("operator service url %q - protocol not supported (only 'nats', 'tls', 'ws', 'wss' only)", v)
}
}

View File

@@ -525,7 +525,7 @@ func processUserPermissionsTemplate(lim jwt.UserPermissionLimits, ujwt *jwt.User
for _, valueList := range nArrayCartesianProduct(tagValues...) {
b := strings.Builder{}
for i, token := range newTokens {
if token == _EMPTY_ {
if token == _EMPTY_ && len(valueList) > 0 {
b.WriteString(valueList[0])
valueList = valueList[1:]
} else {

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2022 The NATS Authors
// Copyright 2012-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1340,6 +1340,7 @@ func (c *client) readLoop(pre []byte) {
c.Errorf("read error: %v", err)
}
c.closeConnection(closedStateForErr(err))
return
} else if bufs == nil {
continue
}
@@ -1498,15 +1499,6 @@ func (c *client) collapsePtoNB() (net.Buffers, int64) {
return c.out.nb, c.out.pb
}
// This will handle the fixup needed on a partial write.
// Assume pending has been already calculated correctly.
func (c *client) handlePartialWrite(pnb net.Buffers) {
if c.isWebsocket() {
c.ws.frames = append(pnb, c.ws.frames...)
return
}
}
// flushOutbound will flush outbound buffer to a client.
// Will return true if data was attempted to be written.
// Lock must be held
@@ -1677,12 +1669,6 @@ func (c *client) flushOutbound() bool {
c.ws.fs -= n
}
// Check for partial writes
// TODO(dlc) - zero write with no error will cause lost message and the writeloop to spin.
if n != attempted && n > 0 {
c.handlePartialWrite(c.out.nb)
}
// Check that if there is still data to send and writeLoop is in wait,
// then we need to signal.
if c.out.pb > 0 {
@@ -2755,7 +2741,7 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw
return sub, nil
}
if err := c.addShadowSubscriptions(acc, sub); err != nil {
if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
c.Errorf(err.Error())
}
@@ -2782,10 +2768,13 @@ type ime struct {
dyn bool
}
// If the client's account has stream imports and there are matches for
// this subscription's subject, then add shadow subscriptions in the
// other accounts that export this subject.
func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
// If the client's account has stream imports and there are matches for this
// subscription's subject, then add shadow subscriptions in the other accounts
// that export this subject.
//
// enact=false allows MQTT clients to get the list of shadow subscriptions
// without enacting them, in order to first obtain matching "retained" messages.
func (c *client) addShadowSubscriptions(acc *Account, sub *subscription, enact bool) error {
if acc == nil {
return ErrMissingAccount
}
@@ -2888,7 +2877,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
for i := 0; i < len(ims); i++ {
ime := &ims[i]
// We will create a shadow subscription.
nsub, err := c.addShadowSub(sub, ime)
nsub, err := c.addShadowSub(sub, ime, enact)
if err != nil {
return err
}
@@ -2905,7 +2894,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
}
// Add in the shadow subscription.
func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error) {
func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscription, error) {
im := ime.im
nsub := *sub // copy
nsub.im = im
@@ -2929,6 +2918,11 @@ func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error
}
}
// Else use original subject
if !enact {
return &nsub, nil
}
c.Debugf("Creating import subscription on %q from account %q", nsub.subject, im.acc.Name)
if err := im.acc.sl.Insert(&nsub); err != nil {
@@ -3298,9 +3292,12 @@ func (c *client) stalledWait(producer *client) {
c.mu.Unlock()
defer c.mu.Lock()
delay := time.NewTimer(ttl)
defer delay.Stop()
select {
case <-stall:
case <-time.After(ttl):
case <-delay.C:
producer.Debugf("Timed out of fast producer stall (%v)", ttl)
}
}
@@ -5045,7 +5042,7 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) {
oldShadows := sub.shadow
sub.shadow = nil
c.mu.Unlock()
c.addShadowSubscriptions(acc, sub)
c.addShadowSubscriptions(acc, sub, true)
for _, nsub := range oldShadows {
nsub.im.acc.sl.Remove(nsub)
}

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.10.4"
VERSION = "2.10.5"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -3731,7 +3731,8 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
// How often we will check for ack floor drift.
// Spread these out for large numbers on a server restart.
delta := time.Duration(rand.Int63n(int64(time.Minute)))
var ackFloorCheck = time.Minute + delta
ticker := time.NewTicker(time.Minute + delta)
defer ticker.Stop()
for {
select {
@@ -3746,7 +3747,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
if hasInactiveThresh {
o.suppressDeletion()
}
case <-time.After(ackFloorCheck):
case <-ticker.C:
o.checkAckFloor()
case <-qch:
return

View File

@@ -810,8 +810,26 @@ func (s *Server) sendStatsz(subj string) {
return
}
shouldCheckInterest := func() bool {
opts := s.getOpts()
if opts.Cluster.Port != 0 || opts.Gateway.Port != 0 || opts.LeafNode.Port != 0 {
return false
}
// If we are here we have no clustering or gateways and are not a leafnode hub.
// Check for leafnode remotes that connect the system account.
if len(opts.LeafNode.Remotes) > 0 {
sysAcc := s.sys.account.GetName()
for _, r := range opts.LeafNode.Remotes {
if r.LocalAccount == sysAcc {
return false
}
}
}
return true
}
// if we are running standalone, check for interest.
if s.standAloneMode() {
if shouldCheckInterest() {
// Check if we even have interest in this subject.
sacc := s.sys.account
rr := sacc.sl.Match(subj)

View File

File diff suppressed because it is too large Load Diff

View File

@@ -1197,6 +1197,11 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
fis, _ := os.ReadDir(sdir)
for _, fi := range fis {
mdir := filepath.Join(sdir, fi.Name())
// Check for partially deleted streams. They are marked with "." prefix.
if strings.HasPrefix(fi.Name(), tsep) {
go os.RemoveAll(mdir)
continue
}
key := sha256.Sum256([]byte(fi.Name()))
hh, err := highwayhash.New64(key[:])
if err != nil {

View File

@@ -2143,7 +2143,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// from underneath the one that is running since it will be the same raft node.
defer n.Stop()
qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID()
qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID()
s.Debugf("Starting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group())
defer s.Debugf("Exiting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group())
@@ -2249,7 +2249,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
startDirectAccessMonitoring := func() {
if dat == nil {
dat = time.NewTicker(1 * time.Second)
dat = time.NewTicker(2 * time.Second)
datc = dat.C
}
}
@@ -2301,6 +2301,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
select {
case <-s.quitCh:
return
case <-mqch:
return
case <-qch:
return
case <-aq.ch:
@@ -2322,6 +2324,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
ne, nb = n.Applied(ce.Index)
ce.ReturnToPool()
} else {
// Our stream was closed out from underneath of us, simply return here.
if err == errStreamClosed {
return
}
s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err)
if isClusterResetErr(err) {
if mset.isMirror() && mset.IsLeader() {
@@ -2349,19 +2355,15 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
case isLeader = <-lch:
if isLeader {
if mset != nil && n != nil {
// Send a snapshot if being asked or if we are tracking
// a failed state so that followers sync.
if clfs := mset.clearCLFS(); clfs > 0 || sendSnapshot {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
if mset != nil && n != nil && sendSnapshot {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
if isRestore {
acc, _ := s.LookupAccount(sa.Client.serviceAccount())
restoreDoneCh = s.processStreamRestore(sa.Client, acc, sa.Config, _EMPTY_, sa.Reply, _EMPTY_)
continue
} else if n.NeedSnapshot() {
} else if n != nil && n.NeedSnapshot() {
doSnapshot()
}
// Always cancel if this was running.
@@ -2388,17 +2390,22 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Here we are checking if we are not the leader but we have been asked to allow
// direct access. We now allow non-leaders to participate in the queue group.
if !isLeader && mset != nil {
startDirectAccessMonitoring()
mset.mu.RLock()
ad, md := mset.cfg.AllowDirect, mset.cfg.MirrorDirect
mset.mu.RUnlock()
if ad || md {
startDirectAccessMonitoring()
}
}
case <-datc:
if mset == nil || isRecovering {
return
continue
}
// If we are leader we can stop, we know this is setup now.
if isLeader {
stopDirectMonitoring()
return
continue
}
mset.mu.Lock()
@@ -2550,6 +2557,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
mset.setStreamAssignment(sa)
// Make sure to update our updateC which would have been nil.
uch = mset.updateC()
// Also update our mqch
mqch = mset.monitorQuitC()
}
}
if err != nil {
@@ -2782,6 +2791,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Grab last sequence and CLFS.
last, clfs := mset.lastSeqAndCLFS()
// We can skip if we know this is less than what we already have.
if lseq-clfs < last {
s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d",
@@ -2812,13 +2822,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Process the actual message here.
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
// Only return in place if we are going to reset stream or we are out of space.
if isClusterResetErr(err) || isOutOfSpaceErr(err) {
// Only return in place if we are going to reset our stream or we are out of space, or we are closed.
if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed {
return err
}
s.Debugf("Apply stream entries for '%s > %s' got error processing message: %v",
mset.account(), mset.name(), err)
}
case deleteMsgOp:
md, err := decodeMsgDelete(buf[1:])
if err != nil {
@@ -5950,10 +5961,13 @@ func sysRequest[T any](s *Server, subjFormat string, args ...interface{}) (*T, e
}
}()
ttl := time.NewTimer(2 * time.Second)
defer ttl.Stop()
select {
case <-s.quitCh:
return nil, errReqSrvExit
case <-time.After(2 * time.Second):
case <-ttl.C:
return nil, errReqTimeout
case data := <-results:
return data, nil
@@ -6086,6 +6100,12 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if isReplicaChange {
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
// Check that we have the allocation available.
if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil {
resp.Error = err
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
// Check if we do not have a cluster assigned, and if we do not make sure we
// try to pick one. This could happen with older streams that were assigned by
// previous servers.
@@ -6957,7 +6977,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// Also short circuit if DeliverLastPerSubject is set with no FilterSubject.
if cfg.DeliverPolicy == DeliverLastPerSubject {
if cfg.FilterSubject == _EMPTY_ {
if cfg.FilterSubject == _EMPTY_ && len(cfg.FilterSubjects) == 0 {
resp.Error = NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last per subject, but FilterSubject is not set"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
@@ -7382,7 +7402,7 @@ func (mset *stream) stateSnapshotLocked() []byte {
Bytes: state.Bytes,
FirstSeq: state.FirstSeq,
LastSeq: state.LastSeq,
Failed: mset.clfs,
Failed: mset.getCLFS(),
Deleted: state.Deleted,
}
b, _ := json.Marshal(snap)
@@ -7419,7 +7439,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.mu.RLock()
canRespond := !mset.cfg.NoAck && len(reply) > 0
name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store
name, stype := mset.cfg.Name, mset.cfg.Storage
s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs
isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed
@@ -7519,26 +7539,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// Some header checks can be checked pre proposal. Most can not.
if len(hdr) > 0 {
// For CAS operations, e.g. ExpectedLastSeqPerSubject, we can also check here and not have to go through.
// Can only precheck for seq != 0.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 {
var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(subject, &smv)
if sm != nil {
fseq = sm.seq
}
if err != nil || fseq != seq {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(fseq)
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq)
}
}
// Expected stream name can also be pre-checked.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
if canRespond {
@@ -7746,8 +7746,8 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
mset.mu.Lock()
var state StreamState
mset.clfs = snap.Failed
mset.store.FastState(&state)
mset.setCLFS(snap.Failed)
sreq := mset.calculateSyncRequest(&state, snap)
s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node

View File

@@ -25,6 +25,7 @@ import (
"net/http"
"net/url"
"os"
"path"
"reflect"
"regexp"
"runtime"
@@ -2349,7 +2350,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
// Only add in shadow subs if a new sub or qsub.
if osub == nil {
if err := c.addShadowSubscriptions(acc, sub); err != nil {
if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
c.Errorf(err.Error())
}
}
@@ -2784,14 +2785,16 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
// create a LEAF connection, not a CLIENT.
// In case we use the user's URL path in the future, make sure we append the user's
// path to our `/leafnode` path.
path := leafNodeWSPath
lpath := leafNodeWSPath
if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
if curPath[0] == '/' {
curPath = curPath[1:]
}
path += curPath
lpath = path.Join(curPath, lpath)
} else {
lpath = lpath[1:]
}
ustr := fmt.Sprintf("%s://%s%s", scheme, rURL.Host, path)
ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
u, _ := url.Parse(ustr)
req := &http.Request{
Method: "GET",

View File

@@ -129,7 +129,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxMsgs
}
}
if ms.cfg.MaxBytes > 0 && ms.state.Bytes+uint64(len(msg)+len(hdr)) >= uint64(ms.cfg.MaxBytes) {
if ms.cfg.MaxBytes > 0 && ms.state.Bytes+memStoreMsgSize(subj, hdr, msg) >= uint64(ms.cfg.MaxBytes) {
if !asl {
return ErrMaxBytes
}
@@ -138,7 +138,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < uint64(len(msg)+len(hdr)) {
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) {
return ErrMaxBytes
}
}

View File

@@ -155,6 +155,7 @@ const (
// while "$MQTT.JSA.<node id>.SL.<number>" is for a stream lookup, etc...
mqttJSAIdTokenPos = 3
mqttJSATokenPos = 4
mqttJSAClientIDPos = 5
mqttJSAStreamCreate = "SC"
mqttJSAStreamUpdate = "SU"
mqttJSAStreamLookup = "SL"
@@ -237,10 +238,9 @@ type mqttAccountSessionManager struct {
sl *Sublist // sublist allowing to find retained messages for given subscription
retmsgs map[string]*mqttRetainedMsgRef // retained messages
jsa mqttJSA
rrmLastSeq uint64 // Restore retained messages expected last sequence
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded
sp *ipQueue[uint64] // Used for cluster-wide processing of session records being persisted
domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject.
rrmLastSeq uint64 // Restore retained messages expected last sequence
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded
domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject.
}
type mqttJSA struct {
@@ -1109,7 +1109,6 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
nuid: nuid.New(),
quitCh: quitCh,
},
sp: newIPQueue[uint64](s, qname+"sp"),
}
// TODO record domain name in as here
@@ -1170,14 +1169,15 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
// This is a subscription that will process all JS API replies. We could split to
// individual subscriptions if needed, but since there is a bit of common code,
// that seemed like a good idea to be all in one place.
if err := as.createSubscription(jsa.rplyr+"*.*",
if err := as.createSubscription(jsa.rplyr+">",
as.processJSAPIReplies, &sid, &subs); err != nil {
return nil, err
}
// We will listen for replies to session persist requests so that we can
// detect the use of a session with the same client ID anywhere in the cluster.
if err := as.createSubscription(mqttJSARepliesPrefix+"*."+mqttJSASessPersist+".*",
// `$MQTT.JSA.{js-id}.SP.{client-id-hash}.{uuid}`
if err := as.createSubscription(mqttJSARepliesPrefix+"*."+mqttJSASessPersist+".*.*",
as.processSessionPersist, &sid, &subs); err != nil {
return nil, err
}
@@ -1203,12 +1203,6 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
as.sendJSAPIrequests(s, c, accName, closeCh)
})
// Start the go routine that will handle network updates regarding sessions
s.startGoRoutine(func() {
defer s.grWG.Done()
as.sessPersistProcessing(closeCh)
})
lookupStream := func(stream, txt string) (*StreamInfo, error) {
si, err := jsa.lookupStream(stream)
if err != nil {
@@ -1407,9 +1401,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
}
if lastSeq > 0 {
ttl := time.NewTimer(mqttJSAPITimeout)
defer ttl.Stop()
select {
case <-rmDoneCh:
case <-time.After(mqttJSAPITimeout):
case <-ttl.C:
s.Warnf("Timing out waiting to load %v retained messages", st.Msgs)
case <-quitCh:
return nil, ErrServerNotRunning
@@ -1454,7 +1451,7 @@ func (s *Server) mqttDetermineReplicas() int {
//////////////////////////////////////////////////////////////////////////////
func (jsa *mqttJSA) newRequest(kind, subject string, hdr int, msg []byte) (interface{}, error) {
return jsa.newRequestEx(kind, subject, hdr, msg, mqttJSAPITimeout)
return jsa.newRequestEx(kind, subject, _EMPTY_, hdr, msg, mqttJSAPITimeout)
}
func (jsa *mqttJSA) prefixDomain(subject string) string {
@@ -1467,19 +1464,24 @@ func (jsa *mqttJSA) prefixDomain(subject string) string {
return subject
}
func (jsa *mqttJSA) newRequestEx(kind, subject string, hdr int, msg []byte, timeout time.Duration) (interface{}, error) {
func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []byte, timeout time.Duration) (interface{}, error) {
var sb strings.Builder
jsa.mu.Lock()
// Either we use nuid.Next() which uses a global lock, or our own nuid object, but
// then it needs to be "write" protected. This approach will reduce across account
// contention since we won't use the global nuid's lock.
var sb strings.Builder
sb.WriteString(jsa.rplyr)
sb.WriteString(kind)
sb.WriteByte(btsep)
if cidHash != _EMPTY_ {
sb.WriteString(cidHash)
sb.WriteByte(btsep)
}
sb.WriteString(jsa.nuid.Next())
reply := sb.String()
jsa.mu.Unlock()
reply := sb.String()
ch := make(chan interface{}, 1)
jsa.replies.Store(reply, ch)
@@ -1646,6 +1648,25 @@ func (jsa *mqttJSA) storeMsgWithKind(kind, subject string, headers int, msg []by
return smr, smr.ToError()
}
func (jsa *mqttJSA) storeSessionMsg(domainTk, cidHash string, hdr int, msg []byte) (*JSPubAckResponse, error) {
// Compute subject where the session is being stored
subject := mqttSessStreamSubjectPrefix + domainTk + cidHash
// Passing cidHash will add it to the JS reply subject, so that we can use
// it in processSessionPersist.
smri, err := jsa.newRequestEx(mqttJSASessPersist, subject, cidHash, hdr, msg, mqttJSAPITimeout)
if err != nil {
return nil, err
}
smr := smri.(*JSPubAckResponse)
return smr, smr.ToError()
}
func (jsa *mqttJSA) loadSessionMsg(domainTk, cidHash string) (*StoredMsg, error) {
subject := mqttSessStreamSubjectPrefix + domainTk + cidHash
return jsa.loadLastMsgFor(mqttSessStreamName, subject)
}
func (jsa *mqttJSA) deleteMsg(stream string, seq uint64, wait bool) error {
dreq := JSApiMsgDeleteRequest{Seq: seq, NoErase: true}
req, _ := json.Marshal(dreq)
@@ -1817,6 +1838,7 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc *
if tokenAt(subject, mqttJSAIdTokenPos) == as.jsa.id {
return
}
cIDHash := tokenAt(subject, mqttJSAClientIDPos)
_, msg := pc.msgParts(rmsg)
if len(msg) < LEN_CR_LF {
return
@@ -1839,18 +1861,6 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc *
if ignore {
return
}
// We would need to lookup the message and that would be a request/reply,
// which we can't do in place here. So move that to a long-running routine
// that will process the session persist record.
as.sp.push(par.Sequence)
}
func (as *mqttAccountSessionManager) processSessPersistRecord(seq uint64) {
smsg, err := as.jsa.loadMsg(mqttSessStreamName, seq)
if err != nil {
return
}
cIDHash := strings.TrimPrefix(smsg.Subject, mqttSessStreamSubjectPrefix+as.domainTk)
as.mu.Lock()
defer as.mu.Unlock()
@@ -1861,7 +1871,7 @@ func (as *mqttAccountSessionManager) processSessPersistRecord(seq uint64) {
// If our current session's stream sequence is higher, it means that this
// update is stale, so we don't do anything here.
sess.mu.Lock()
ignore := seq < sess.seq
ignore = par.Sequence < sess.seq
sess.mu.Unlock()
if ignore {
return
@@ -1881,28 +1891,6 @@ func (as *mqttAccountSessionManager) processSessPersistRecord(seq uint64) {
sess.mu.Unlock()
}
func (as *mqttAccountSessionManager) sessPersistProcessing(closeCh chan struct{}) {
as.mu.RLock()
sp := as.sp
quitCh := as.jsa.quitCh
as.mu.RUnlock()
for {
select {
case <-sp.ch:
seqs := sp.pop()
for _, seq := range seqs {
as.processSessPersistRecord(seq)
}
sp.recycle(&seqs)
case <-closeCh:
return
case <-quitCh:
return
}
}
}
// Adds this client ID to the flappers map, and if needed start the timer
// for map cleanup.
//
@@ -2176,6 +2164,30 @@ func (as *mqttAccountSessionManager) removeSession(sess *mqttSession, lock bool)
}
}
// Helpers that sets the sub's mqtt fields and possibly serialize
// (pre-loaded) retained messages.
// Session lock held on entry.
func (sess *mqttSession) processSub(c *client, subject, sid []byte, isReserved bool, qos byte, jsDurName string, h msgHandler, initShadow bool) (*subscription, error) {
sub, err := c.processSub(subject, nil, sid, h, false)
if err != nil {
// c.processSub already called c.Errorf(), so no need here.
return nil, err
}
subs := []*subscription{sub}
if initShadow {
subs = append(subs, sub.shadow...)
}
for _, ss := range subs {
if ss.mqtt == nil {
ss.mqtt = &mqttSub{}
}
ss.mqtt.qos = qos
ss.mqtt.reserved = isReserved
ss.mqtt.jsDur = jsDurName
}
return sub, nil
}
// Process subscriptions for the given session/client.
//
// When `fromSubProto` is false, it means that this is invoked from the CONNECT
@@ -2193,14 +2205,85 @@ func (as *mqttAccountSessionManager) removeSession(sess *mqttSession, lock bool)
func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
filters []*mqttFilter, fromSubProto, trace bool) ([]*subscription, error) {
// Helpers to lock/unlock both account manager and session.
asAndSessLock := func() {
as.mu.Lock()
sess.mu.Lock()
// Helper to determine if we need to create a separate top-level
// subscription for a wildcard.
fwc := func(subject string) (bool, string, string) {
if !mqttNeedSubForLevelUp(subject) {
return false, _EMPTY_, _EMPTY_
}
// Say subject is "foo.>", remove the ".>" so that it becomes "foo"
fwcsubject := subject[:len(subject)-2]
// Change the sid to "foo fwc"
fwcsid := fwcsubject + mqttMultiLevelSidSuffix
return true, fwcsubject, fwcsid
}
asAndSessUnlock := func() {
sess.mu.Unlock()
as.mu.Unlock()
// Cache and a helper to load retained messages for a given subject.
rms := make(map[string]*mqttRetainedMsg)
loadRMS := func(subject []byte) error {
sub := &subscription{
client: c,
subject: subject,
sid: subject,
}
c.mu.Lock()
acc := c.acc
c.mu.Unlock()
if err := c.addShadowSubscriptions(acc, sub, false); err != nil {
return err
}
// Best-effort loading the messages, logs on errors (to c.srv), loads
// once for subject.
as.loadRetainedMessagesForSubject(rms, subject, c.srv)
for _, ss := range sub.shadow {
as.loadRetainedMessagesForSubject(rms, ss.subject, c.srv)
}
return nil
}
// Preload retained messages for all requested subscriptions. Also, since
// it's the first iteration over the filter list, do some cleanup.
for _, f := range filters {
if f.qos > 2 {
f.qos = 2
}
if c.mqtt.downgradeQoS2Sub && f.qos == 2 {
c.Warnf("Downgrading subscription QoS2 to QoS1 for %q, as configured", f.filter)
f.qos = 1
}
// Do not allow subscribing to our internal subjects.
//
// TODO: (levb: not sure why since one can subscribe to `#` and it'll
// include everything; I guess this would discourage? Otherwise another
// candidate for DO NOT DELIVER prefix list).
if strings.HasPrefix(f.filter, mqttSubPrefix) {
f.qos = mqttSubAckFailure
continue
}
if f.qos == 2 {
if err := sess.ensurePubRelConsumerSubscription(c); err != nil {
c.Errorf("failed to initialize PUBREL processing: %v", err)
f.qos = mqttSubAckFailure
continue
}
}
// Load retained messages.
if fromSubProto {
if err := loadRMS([]byte(f.filter)); err != nil {
f.qos = mqttSubAckFailure
continue
}
if need, subject, _ := fwc(f.filter); need {
if err := loadRMS([]byte(subject)); err != nil {
f.qos = mqttSubAckFailure
continue
}
}
}
}
// Small helper to add the consumer config to the session.
@@ -2214,90 +2297,80 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
sess.cons[sid] = cc
}
// Helper that sets the sub's mqtt fields and possibly serialize retained messages.
// Assumes account manager and session lock held.
setupSub := func(sub *subscription, qos byte) {
subs := []*subscription{sub}
if len(sub.shadow) > 0 {
subs = append(subs, sub.shadow...)
}
for _, sub := range subs {
if sub.mqtt == nil {
sub.mqtt = &mqttSub{}
}
sub.mqtt.qos = qos
sub.mqtt.reserved = isMQTTReservedSubscription(string(sub.subject))
if fromSubProto {
as.serializeRetainedMsgsForSub(sess, c, sub, trace)
}
serializeRMS := func(sub *subscription) {
for _, ss := range append([]*subscription{sub}, sub.shadow...) {
as.serializeRetainedMsgsForSub(rms, sess, c, ss, trace)
}
}
var err error
subs := make([]*subscription, 0, len(filters))
for _, f := range filters {
if f.qos > 2 {
f.qos = 2
}
if c.mqtt.downgradeQoS2Sub && f.qos == 2 {
c.Warnf("Downgrading subscription QoS2 to QoS1 for %q, as configured", f.filter)
f.qos = 1
}
subject := f.filter
sid := subject
if strings.HasPrefix(subject, mqttSubPrefix) {
f.qos = mqttSubAckFailure
// Skip what's already been identified as a failure.
if f.qos == mqttSubAckFailure {
continue
}
subject := f.filter
bsubject := []byte(subject)
sid := subject
bsid := bsubject
var jscons *ConsumerConfig
var jssub *subscription
// Note that if a subscription already exists on this subject,
// the existing sub is returned. Need to update the qos.
asAndSessLock()
sub, err := c.processSub([]byte(subject), nil, []byte(sid), mqttDeliverMsgCbQoS0, false)
if err == nil {
setupSub(sub, f.qos)
}
if f.qos == 2 {
err = sess.ensurePubRelConsumerSubscription(c)
}
asAndSessUnlock()
if err == nil {
// This will create (if not already exist) a JS consumer for subscriptions
// of QoS >= 1. But if a JS consumer already exists and the subscription
// for same subject is now a QoS==0, then the JS consumer will be deleted.
jscons, jssub, err = sess.processJSConsumer(c, subject, sid, f.qos, fromSubProto)
// Note that if a subscription already exists on this subject, the
// existing sub is returned. Need to update the qos.
as.mu.Lock()
sess.mu.Lock()
sub, err := sess.processSub(c, bsubject, bsid,
isMQTTReservedSubscription(subject), f.qos, _EMPTY_, mqttDeliverMsgCbQoS0, true)
if err == nil && fromSubProto {
serializeRMS(sub)
}
sess.mu.Unlock()
as.mu.Unlock()
if err != nil {
// c.processSub already called c.Errorf(), so no need here.
f.qos = mqttSubAckFailure
sess.cleanupFailedSub(c, sub, jscons, jssub)
continue
}
if mqttNeedSubForLevelUp(subject) {
// This will create (if not already exist) a JS consumer for
// subscriptions of QoS >= 1. But if a JS consumer already exists and
// the subscription for same subject is now a QoS==0, then the JS
// consumer will be deleted.
jscons, jssub, err = sess.processJSConsumer(c, subject, sid, f.qos, fromSubProto)
if err != nil {
f.qos = mqttSubAckFailure
sess.cleanupFailedSub(c, sub, jscons, jssub)
continue
}
// Process the wildcard subject if needed.
if need, fwcsubject, fwcsid := fwc(subject); need {
var fwjscons *ConsumerConfig
var fwjssub *subscription
var fwcsub *subscription
// Say subject is "foo.>", remove the ".>" so that it becomes "foo"
fwcsubject := subject[:len(subject)-2]
// Change the sid to "foo fwc"
fwcsid := fwcsubject + mqttMultiLevelSidSuffix
// See note above about existing subscription.
asAndSessLock()
fwcsub, err = c.processSub([]byte(fwcsubject), nil, []byte(fwcsid), mqttDeliverMsgCbQoS0, false)
if err == nil {
setupSub(fwcsub, f.qos)
as.mu.Lock()
sess.mu.Lock()
fwcsub, err = sess.processSub(c, []byte(fwcsubject), []byte(fwcsid),
isMQTTReservedSubscription(subject), f.qos, _EMPTY_, mqttDeliverMsgCbQoS0, true)
if err == nil && fromSubProto {
serializeRMS(fwcsub)
}
asAndSessUnlock()
if err == nil {
fwjscons, fwjssub, err = sess.processJSConsumer(c, fwcsubject, fwcsid, f.qos, fromSubProto)
sess.mu.Unlock()
as.mu.Unlock()
if err != nil {
// c.processSub already called c.Errorf(), so no need here.
f.qos = mqttSubAckFailure
sess.cleanupFailedSub(c, sub, jscons, jssub)
continue
}
fwjscons, fwjssub, err = sess.processJSConsumer(c, fwcsubject, fwcsid, f.qos, fromSubProto)
if err != nil {
// c.processSub already called c.Errorf(), so no need here.
f.qos = mqttSubAckFailure
@@ -2305,6 +2378,7 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
sess.cleanupFailedSub(c, fwcsub, fwjscons, fwjssub)
continue
}
subs = append(subs, fwcsub)
addJSConsToSess(fwcsid, fwjscons)
}
@@ -2328,15 +2402,19 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
// Runs from the client's readLoop.
// Account session manager lock held on entry.
// Session lock held on entry.
func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(sess *mqttSession, c *client, sub *subscription, trace bool) {
if len(as.retmsgs) == 0 {
func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(rms map[string]*mqttRetainedMsg, sess *mqttSession, c *client, sub *subscription, trace bool) {
if len(as.retmsgs) == 0 || len(rms) == 0 {
return
}
var rmsa [64]*mqttRetainedMsg
rms := rmsa[:0]
as.getRetainedPublishMsgs(string(sub.subject), &rms)
for _, rm := range rms {
result := as.sl.ReverseMatch(string(sub.subject))
if len(result.psubs) == 0 {
return
}
for _, psub := range result.psubs {
rm, ok := rms[string(psub.subject)]
if !ok {
continue
}
if sub.mqtt.prm == nil {
sub.mqtt.prm = &mqttWriter{}
}
@@ -2379,23 +2457,36 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(sess *mqttSessi
// Returns in the provided slice all publish retained message records that
// match the given subscription's `subject` (which could have wildcards).
//
// Account session manager lock held on entry.
func (as *mqttAccountSessionManager) getRetainedPublishMsgs(subject string, rms *[]*mqttRetainedMsg) {
result := as.sl.ReverseMatch(subject)
// Account session manager NOT lock held on entry.
func (as *mqttAccountSessionManager) loadRetainedMessagesForSubject(rms map[string]*mqttRetainedMsg, topSubject []byte, log Logger) {
as.mu.RLock()
if len(as.retmsgs) == 0 {
as.mu.RUnlock()
return
}
result := as.sl.ReverseMatch(string(topSubject))
as.mu.RUnlock()
if len(result.psubs) == 0 {
return
}
for _, sub := range result.psubs {
subj := mqttRetainedMsgsStreamSubject + string(sub.subject)
jsm, err := as.jsa.loadLastMsgFor(mqttRetainedMsgsStreamName, subj)
subject := string(sub.subject)
if rms[subject] != nil {
continue // already loaded
}
loadSubject := mqttRetainedMsgsStreamSubject + subject
jsm, err := as.jsa.loadLastMsgFor(mqttRetainedMsgsStreamName, loadSubject)
if err != nil || jsm == nil {
log.Warnf("failed to load retained message for subject %q: %v", loadSubject, err)
continue
}
var rm mqttRetainedMsg
if err := json.Unmarshal(jsm.Data, &rm); err != nil {
log.Warnf("failed to decode retained message for subject %q: %v", loadSubject, err)
continue
}
*rms = append(*rms, &rm)
rms[subject] = &rm
}
}
@@ -2417,8 +2508,7 @@ func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opt
}
hash := getHash(clientID)
subject := mqttSessStreamSubjectPrefix + as.domainTk + hash
smsg, err := jsa.loadLastMsgFor(mqttSessStreamName, subject)
smsg, err := jsa.loadSessionMsg(as.domainTk, hash)
if err != nil {
if isErrorOtherThan(err, JSNoMessageFoundErr) {
return formatError("loading session record", err)
@@ -2434,6 +2524,7 @@ func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opt
if err := json.Unmarshal(smsg.Data, ps); err != nil {
return formatError(fmt.Sprintf("unmarshal of session record at sequence %v", smsg.Sequence), err)
}
// Restore this session (even if we don't own it), the caller will do the right thing.
sess := mqttSessionCreate(jsa, clientID, hash, smsg.Sequence, opts)
sess.domainTk = as.domainTk
@@ -2479,7 +2570,7 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve
}()
jsa := &as.jsa
sni, err := jsa.newRequestEx(mqttJSAStreamNames, JSApiStreams, 0, nil, 5*time.Second)
sni, err := jsa.newRequestEx(mqttJSAStreamNames, JSApiStreams, _EMPTY_, 0, nil, 5*time.Second)
if err != nil {
log.Errorf("Unable to transfer MQTT session streams: %v", err)
return
@@ -2514,10 +2605,8 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve
log.Warnf(" Unable to unmarshal the content of this stream, may not be a legitimate MQTT session stream, skipping")
continue
}
// Compute subject where the session is being stored
subject := mqttSessStreamSubjectPrefix + as.domainTk + getHash(ps.ID)
// Store record to MQTT session stream
if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil {
if _, err := jsa.storeSessionMsg(as.domainTk, getHash(ps.ID), 0, smsg.Data); err != nil {
log.Errorf(" Unable to transfer the session record: %v", err)
return
}
@@ -2553,7 +2642,8 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
}
// Store the message again, this time with the new per-key subject.
subject := mqttRetainedMsgsStreamSubject + rmsg.Subject
if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil {
if _, err := jsa.storeMsg(subject, 0, smsg.Data); err != nil {
log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err)
errors++
continue
@@ -2619,7 +2709,7 @@ func (sess *mqttSession) save() error {
}
b, _ := json.Marshal(&ps)
subject := mqttSessStreamSubjectPrefix + sess.domainTk + sess.idHash
domainTk, cidHash := sess.domainTk, sess.idHash
seq := sess.seq
sess.mu.Unlock()
@@ -2637,7 +2727,7 @@ func (sess *mqttSession) save() error {
b = bb.Bytes()
}
resp, err := sess.jsa.storeMsgWithKind(mqttJSASessPersist, subject, hdr, b)
resp, err := sess.jsa.storeSessionMsg(domainTk, cidHash, hdr, b)
if err != nil {
return fmt.Errorf("unable to persist session %q (seq=%v): %v", ps.ID, seq, err)
}
@@ -2691,8 +2781,13 @@ func (sess *mqttSession) clear() error {
}
if seq > 0 {
if err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true); err != nil {
return fmt.Errorf("unable to delete session %q record at sequence %v", id, seq)
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true)
// Ignore the various errors indicating that the message (or sequence)
// is already deleted, can happen in a cluster.
if isErrorOtherThan(err, JSSequenceNotFoundErrF) {
if isErrorOtherThan(err, JSStreamMsgDeleteFailedF) || !strings.Contains(err.Error(), ErrStoreMsgNotFound.Error()) {
return fmt.Errorf("unable to delete session %q record at sequence %v: %v", id, seq, err)
}
}
}
return nil
@@ -3149,6 +3244,9 @@ func (c *client) mqttConnectTrace(cp *mqttConnectProto) string {
trace += fmt.Sprintf(" will=(topic=%s QoS=%v retain=%v)",
cp.will.topic, cp.will.qos, cp.will.retain)
}
if cp.flags&mqttConnFlagCleanSession != 0 {
trace += " clean"
}
if c.opts.Username != _EMPTY_ {
trace += fmt.Sprintf(" username=%s", c.opts.Username)
}
@@ -4349,11 +4447,11 @@ func mqttIsReservedSub(sub *subscription, subject string) bool {
// Check if a sub is a reserved wildcard. E.g. '#', '*', or '*/" prefix.
func isMQTTReservedSubscription(subject string) bool {
if len(subject) == 1 && subject[0] == fwc || subject[0] == pwc {
if len(subject) == 1 && (subject[0] == fwc || subject[0] == pwc) {
return true
}
// Match "*.<>"
if len(subject) > 1 && subject[0] == pwc && subject[1] == btsep {
if len(subject) > 1 && (subject[0] == pwc && subject[1] == btsep) {
return true
}
return false
@@ -4467,9 +4565,6 @@ func (sess *mqttSession) cleanupFailedSub(c *client, sub *subscription, cc *Cons
// Make sure we are set up to deliver PUBREL messages to this QoS2-subscribed
// session.
//
// Session lock held on entry. Need to make sure no other subscribe packet races
// to do the same.
func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error {
opts := c.srv.getOpts()
ackWait := opts.MQTT.AckWait
@@ -4481,21 +4576,32 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error {
maxAckPending = mqttDefaultMaxAckPending
}
sess.mu.Lock()
pubRelSubscribed := sess.pubRelSubscribed
pubRelSubject := sess.pubRelSubject
pubRelDeliverySubjectB := sess.pubRelDeliverySubjectB
pubRelDeliverySubject := sess.pubRelDeliverySubject
pubRelConsumer := sess.pubRelConsumer
tmaxack := sess.tmaxack
idHash := sess.idHash
id := sess.id
sess.mu.Unlock()
// Subscribe before the consumer is created so we don't loose any messages.
if !sess.pubRelSubscribed {
_, err := c.processSub(sess.pubRelDeliverySubjectB, nil, sess.pubRelDeliverySubjectB,
if !pubRelSubscribed {
_, err := c.processSub(pubRelDeliverySubjectB, nil, pubRelDeliverySubjectB,
mqttDeliverPubRelCb, false)
if err != nil {
c.Errorf("Unable to create subscription for JetStream consumer on %q: %v", sess.pubRelDeliverySubject, err)
c.Errorf("Unable to create subscription for JetStream consumer on %q: %v", pubRelDeliverySubject, err)
return err
}
sess.pubRelSubscribed = true
pubRelSubscribed = true
}
// Create the consumer if needed.
if sess.pubRelConsumer == nil {
if pubRelConsumer == nil {
// Check that the limit of subs' maxAckPending are not going over the limit
if after := sess.tmaxack + maxAckPending; after > mqttMaxAckTotalLimit {
if after := tmaxack + maxAckPending; after > mqttMaxAckTotalLimit {
return fmt.Errorf("max_ack_pending for all consumers would be %v which exceeds the limit of %v",
after, mqttMaxAckTotalLimit)
}
@@ -4503,11 +4609,11 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error {
ccr := &CreateConsumerRequest{
Stream: mqttOutStreamName,
Config: ConsumerConfig{
DeliverSubject: sess.pubRelDeliverySubject,
Durable: mqttPubRelConsumerDurablePrefix + sess.idHash,
DeliverSubject: pubRelDeliverySubject,
Durable: mqttPubRelConsumerDurablePrefix + idHash,
AckPolicy: AckExplicit,
DeliverPolicy: DeliverNew,
FilterSubject: sess.pubRelSubject,
FilterSubject: pubRelSubject,
AckWait: ackWait,
MaxAckPending: maxAckPending,
MemoryStorage: opts.MQTT.ConsumerMemoryStorage,
@@ -4517,28 +4623,41 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error {
ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold
}
if _, err := sess.jsa.createConsumer(ccr); err != nil {
c.Errorf("Unable to add JetStream consumer for PUBREL for client %q: err=%v", sess.id, err)
c.Errorf("Unable to add JetStream consumer for PUBREL for client %q: err=%v", id, err)
return err
}
sess.pubRelConsumer = &ccr.Config
sess.tmaxack += maxAckPending
pubRelConsumer = &ccr.Config
tmaxack += maxAckPending
}
sess.mu.Lock()
sess.pubRelSubscribed = pubRelSubscribed
sess.pubRelConsumer = pubRelConsumer
sess.tmaxack = tmaxack
sess.mu.Unlock()
return nil
}
// When invoked with a QoS of 0, looks for an existing JS durable consumer for
// the given sid and if one is found, delete the JS durable consumer and unsub
// the NATS subscription on the delivery subject.
//
// With a QoS > 0, creates or update the existing JS durable consumer along with
// its NATS subscription on a delivery subject.
//
// Lock not held on entry, but session is in the locked map.
// Session lock is acquired and released as needed. Session is in the locked
// map.
func (sess *mqttSession) processJSConsumer(c *client, subject, sid string,
qos byte, fromSubProto bool) (*ConsumerConfig, *subscription, error) {
// Check if we are already a JS consumer for this SID.
sess.mu.Lock()
cc, exists := sess.cons[sid]
tmaxack := sess.tmaxack
idHash := sess.idHash
sess.mu.Unlock()
// Check if we are already a JS consumer for this SID.
if exists {
// If current QoS is 0, it means that we need to delete the existing
// one (that was QoS > 0)
@@ -4547,7 +4666,11 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string,
// the form: mqttSubPrefix + <nuid>. It is also used as the sid
// for the NATS subscription, so use that for the lookup.
sub := c.subs[cc.DeliverSubject]
sess.mu.Lock()
delete(sess.cons, sid)
sess.mu.Unlock()
sess.deleteConsumer(cc)
if sub != nil {
c.processUnsub(sub.sid)
@@ -4583,12 +4706,12 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string,
}
// Check that the limit of subs' maxAckPending are not going over the limit
if after := sess.tmaxack + maxAckPending; after > mqttMaxAckTotalLimit {
if after := tmaxack + maxAckPending; after > mqttMaxAckTotalLimit {
return nil, nil, fmt.Errorf("max_ack_pending for all consumers would be %v which exceeds the limit of %v",
after, mqttMaxAckTotalLimit)
}
durName := sess.idHash + "_" + nuid.Next()
durName := idHash + "_" + nuid.Next()
ccr := &CreateConsumerRequest{
Stream: mqttStreamName,
Config: ConsumerConfig{
@@ -4610,25 +4733,22 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string,
return nil, nil, err
}
cc = &ccr.Config
sess.tmaxack += maxAckPending
tmaxack += maxAckPending
}
// This is an internal subscription on subject like "$MQTT.sub.<nuid>" that is setup
// for the JS durable's deliver subject.
sess.mu.Lock()
sub, err := c.processSub([]byte(inbox), nil, []byte(inbox), mqttDeliverMsgCbQoS12, false)
sess.tmaxack = tmaxack
sub, err := sess.processSub(c, []byte(inbox), []byte(inbox),
isMQTTReservedSubscription(subject), qos, cc.Durable, mqttDeliverMsgCbQoS12, false)
sess.mu.Unlock()
if err != nil {
sess.mu.Unlock()
sess.deleteConsumer(cc)
c.Errorf("Unable to create subscription for JetStream consumer on %q: %v", subject, err)
return nil, nil, err
}
if sub.mqtt == nil {
sub.mqtt = &mqttSub{}
}
sub.mqtt.qos = qos
sub.mqtt.jsDur = cc.Durable
sub.mqtt.reserved = isMQTTReservedSubscription(subject)
sess.mu.Unlock()
return cc, sub, nil
}

View File

@@ -103,7 +103,6 @@ const (
Follower RaftState = iota
Leader
Candidate
Observer
Closed
)
@@ -115,8 +114,6 @@ func (state RaftState) String() string {
return "CANDIDATE"
case Leader:
return "LEADER"
case Observer:
return "OBSERVER"
case Closed:
return "CLOSED"
}
@@ -125,108 +122,105 @@ func (state RaftState) String() string {
type raft struct {
sync.RWMutex
created time.Time
group string
sd string
id string
wal WAL
wtype StorageType
track bool
werr error
created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
group string // Raft group
sd string // Store directory
id string // Node ID
wal WAL // WAL store (filestore or memstore)
wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage
track bool //
werr error // Last write error
state atomic.Int32 // RaftState
hh hash.Hash64
snapfile string
csz int
qn int
peers map[string]*lps
removed map[string]struct{}
acks map[uint64]map[string]struct{}
pae map[uint64]*appendEntry
elect *time.Timer
active time.Time
llqrt time.Time
lsut time.Time
term uint64 // The current vote term
pterm uint64 // Previous term from the last snapshot
pindex uint64 // Previous index from the last snapshot
commit uint64 // Sequence number of the most recent commit
applied uint64 // Sequence number of the most recently applied commit
leader string // The ID of the leader
vote string
hash string
s *Server
c *client
js *jetStream
dflag bool
pleader bool
observer bool
extSt extensionState
hh hash.Hash64 // Highwayhash, used for snapshots
snapfile string // Snapshot filename
// Subjects for votes, updates, replays.
psubj string
rpsubj string
vsubj string
vreply string
asubj string
areply string
csz int // Cluster size
qn int // Number of nodes needed to establish quorum
peers map[string]*lps // Other peers in the Raft group
sq *sendq
aesub *subscription
removed map[string]struct{} // Peers that were removed from the group
acks map[uint64]map[string]struct{} // Append entry responses/acks, map of entry index -> peer ID
pae map[uint64]*appendEntry // Pending append entries
// Are we doing a leadership transfer.
lxfer bool
elect *time.Timer // Election timer, normally accessed via electTimer
active time.Time // Last activity time, i.e. for heartbeats
llqrt time.Time // Last quorum lost time
lsut time.Time // Last scale-up time
// For holding term and vote and peerstate to be written.
wtv []byte
wps []byte
wtvch chan struct{}
wpsch chan struct{}
term uint64 // The current vote term
pterm uint64 // Previous term from the last snapshot
pindex uint64 // Previous index from the last snapshot
commit uint64 // Sequence number of the most recent commit
applied uint64 // Sequence number of the most recently applied commit
// For when we need to catch up as a follower.
catchup *catchupState
leader string // The ID of the leader
vote string // Our current vote state
lxfer bool // Are we doing a leadership transfer?
// For leader or server catching up a follower.
progress map[string]*ipQueue[uint64]
s *Server // Reference to top-level server
c *client // Internal client for subscriptions
js *jetStream // JetStream, if running, to see if we are out of resources
// For when we have paused our applyC.
paused bool
hcommit uint64
pobserver bool
dflag bool // Debug flag
pleader bool // Has the group ever had a leader?
observer bool // The node is observing, i.e. not participating in voting
extSt extensionState // Extension state
// Queues and Channels
prop *ipQueue[*Entry]
entry *ipQueue[*appendEntry]
resp *ipQueue[*appendEntryResponse]
apply *ipQueue[*CommittedEntry]
reqs *ipQueue[*voteRequest]
votes *ipQueue[*voteResponse]
stepdown *ipQueue[string]
leadc chan bool
quit chan struct{}
psubj string // Proposals subject
rpsubj string // Remove peers subject
vsubj string // Vote requests subject
vreply string // Vote responses subject
asubj string // Append entries subject
areply string // Append entries responses subject
// Account name of the asset this raft group is for
accName string
sq *sendq // Send queue for outbound RPC messages
aesub *subscription // Subscription for handleAppendEntry callbacks
// Random generator, used to generate inboxes for instance
prand *rand.Rand
wtv []byte // Term and vote to be written
wps []byte // Peer state to be written
wtvch chan struct{} // Signals when a term vote was just written, to kick file writer
wpsch chan struct{} // Signals when a peer state was just written, to kick file writer
catchup *catchupState // For when we need to catch up as a follower.
progress map[string]*ipQueue[uint64] // For leader or server catching up a follower.
paused bool // Whether or not applies are paused
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused
prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
stepdown *ipQueue[string] // Stepdown requests
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
prand *rand.Rand // Random generator, used to generate inboxes for instance
}
// cacthupState structure that holds our subscription, and catchup term and index
// as well as starting term and index and how many updates we have seen.
type catchupState struct {
sub *subscription
cterm uint64
cindex uint64
pterm uint64
pindex uint64
active time.Time
sub *subscription // Subscription that catchup messages will arrive on
cterm uint64 // Catchup term
cindex uint64 // Catchup index
pterm uint64 // Starting term
pindex uint64 // Starting index
active time.Time // Last time we received a message for this catchup
}
// lps holds peer state of last time and last index replicated.
type lps struct {
ts int64
li uint64
kp bool // marks as known peer.
ts int64 // Last timestamp
li uint64 // Last index replicated
kp bool // Known peer
}
const (
@@ -237,7 +231,6 @@ const (
hbIntervalDefault = 1 * time.Second
lostQuorumIntervalDefault = hbIntervalDefault * 10 // 10 seconds
lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds
)
var (
@@ -383,7 +376,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
track: cfg.Track,
csz: ps.clusterSize,
qn: ps.clusterSize/2 + 1,
hash: hash,
peers: make(map[string]*lps),
acks: make(map[uint64]map[string]struct{}),
pae: make(map[uint64]*appendEntry),
@@ -413,14 +405,19 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
n.dflag = true
}
// Set up the highwayhash for the snapshots.
key := sha256.Sum256([]byte(n.group))
n.hh, _ = highwayhash.New64(key[:])
// If we have a term and vote file (tav.idx on the filesystem) then read in
// what we think the term and vote was. It's possible these are out of date
// so a catch-up may be required.
if term, vote, err := n.readTermVote(); err == nil && term > 0 {
n.term = term
n.vote = vote
}
// Make sure that the snapshots directory exists.
if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), 0750); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}
@@ -433,6 +430,9 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
n.setupLastSnapshot()
}
// Retrieve the stream state from the WAL. If there are pending append
// entries that were committed but not applied before we last shut down,
// we will try to replay them and process them here.
var state StreamState
n.wal.FastState(&state)
if state.Msgs > 0 {
@@ -444,6 +444,8 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
}
}
// It looks like there are entries we have committed but not applied
// yet. Replay them.
for index := state.FirstSeq; index <= state.LastSeq; index++ {
ae, err := n.loadEntry(index)
if err != nil {
@@ -469,15 +471,18 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
// Make sure to track ourselves.
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true}
// Track known peers
for _, peer := range ps.knownPeers {
// Set these to 0 to start but mark as known peer.
if peer != n.id {
// Set these to 0 to start but mark as known peer.
n.peers[peer] = &lps{0, 0, true}
}
}
// Setup our internal subscriptions.
// Setup our internal subscriptions for proposals, votes and append entries.
// If we fail to do this for some reason then this is fatal — we cannot
// continue setting up or the Raft node may be partially/totally isolated.
if err := n.createInternalSubs(); err != nil {
n.shutdown(true)
return nil, err
@@ -486,18 +491,26 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
n.debug("Started")
// Check if we need to start in observer mode due to lame duck status.
// This will stop us from taking on the leader role when we're about to
// shutdown anyway.
if s.isLameDuckMode() {
n.debug("Will start in observer mode due to lame duck status")
n.SetObserver(true)
}
// Set the election timer and lost quorum timers to now, so that we
// won't accidentally trigger either state without knowing the real state
// of the other nodes.
n.Lock()
n.resetElectionTimeout()
n.llqrt = time.Now()
n.Unlock()
// Register the Raft group.
labels["group"] = n.group
s.registerRaftNode(n.group, n)
// Start the goroutines for the Raft state machine and the file writer.
s.startGoRoutine(n.run, labels)
s.startGoRoutine(n.fileWriter)
@@ -529,7 +542,8 @@ func (s *Server) clusterNameForNode(node string) string {
return _EMPTY_
}
// Server will track all raft nodes.
// Registers the Raft node with the server, as it will track all of the Raft
// nodes.
func (s *Server) registerRaftNode(group string, n RaftNode) {
s.rnMu.Lock()
defer s.rnMu.Unlock()
@@ -539,6 +553,7 @@ func (s *Server) registerRaftNode(group string, n RaftNode) {
s.raftNodes[group] = n
}
// Unregisters the Raft node from the server, i.e. at shutdown.
func (s *Server) unregisterRaftNode(group string) {
s.rnMu.Lock()
defer s.rnMu.Unlock()
@@ -547,12 +562,15 @@ func (s *Server) unregisterRaftNode(group string) {
}
}
// Returns how many Raft nodes are running in this server instance.
func (s *Server) numRaftNodes() int {
s.rnMu.Lock()
defer s.rnMu.Unlock()
return len(s.raftNodes)
}
// Finds the Raft node for a given Raft group, if any. If there is no Raft node
// running for this group then it can return nil.
func (s *Server) lookupRaftNode(group string) RaftNode {
s.rnMu.RLock()
defer s.rnMu.RUnlock()
@@ -563,6 +581,8 @@ func (s *Server) lookupRaftNode(group string) RaftNode {
return n
}
// Reloads the debug state for all running Raft nodes. This is necessary when
// the configuration has been reloaded and the debug log level has changed.
func (s *Server) reloadDebugRaftNodes(debug bool) {
if s == nil {
return
@@ -577,15 +597,19 @@ func (s *Server) reloadDebugRaftNodes(debug bool) {
s.rnMu.RUnlock()
}
// Requests that all Raft nodes on this server step down and place them into
// observer mode. This is called when the server is shutting down.
func (s *Server) stepdownRaftNodes() {
if s == nil {
return
}
var nodes []RaftNode
s.rnMu.RLock()
if len(s.raftNodes) > 0 {
s.Debugf("Stepping down all leader raft nodes")
if len(s.raftNodes) == 0 {
s.rnMu.RUnlock()
return
}
s.Debugf("Stepping down all leader raft nodes")
nodes := make([]RaftNode, 0, len(s.raftNodes))
for _, n := range s.raftNodes {
nodes = append(nodes, n)
}
@@ -599,15 +623,20 @@ func (s *Server) stepdownRaftNodes() {
}
}
// Shuts down all Raft nodes on this server. This is called either when the
// server is either entering lame duck mode, shutting down or when JetStream
// has been disabled.
func (s *Server) shutdownRaftNodes() {
if s == nil {
return
}
var nodes []RaftNode
s.rnMu.RLock()
if len(s.raftNodes) > 0 {
s.Debugf("Shutting down all raft nodes")
if len(s.raftNodes) == 0 {
s.rnMu.RUnlock()
return
}
nodes := make([]RaftNode, 0, len(s.raftNodes))
s.Debugf("Shutting down all raft nodes")
for _, n := range s.raftNodes {
nodes = append(nodes, n)
}
@@ -625,11 +654,12 @@ func (s *Server) transferRaftLeaders() bool {
if s == nil {
return false
}
var nodes []RaftNode
s.rnMu.RLock()
if len(s.raftNodes) > 0 {
s.Debugf("Transferring any raft leaders")
if len(s.raftNodes) == 0 {
s.rnMu.RUnlock()
return false
}
nodes := make([]RaftNode, 0, len(s.raftNodes))
for _, n := range s.raftNodes {
nodes = append(nodes, n)
}
@@ -668,7 +698,8 @@ func (n *raft) Propose(data []byte) error {
return nil
}
// ProposeDirect will propose entries directly.
// ProposeDirect will propose entries directly by skipping the Raft state
// machine and sending them straight to the wire instead.
// This should only be called on the leader.
func (n *raft) ProposeDirect(entries []*Entry) error {
if state := n.State(); state != Leader {
@@ -746,13 +777,16 @@ func (n *raft) ProposeRemovePeer(peer string) error {
return werr
}
// If we are the leader then we are responsible for processing the
// peer remove and then notifying the rest of the group that the
// peer was removed.
if isLeader {
prop.push(newEntry(EntryRemovePeer, []byte(peer)))
n.doRemovePeerAsLeader(peer)
return nil
}
// Need to forward.
// Otherwise we need to forward the proposal to the leader.
n.sendRPC(subj, _EMPTY_, []byte(peer))
return nil
}
@@ -779,7 +813,8 @@ func (n *raft) AdjustBootClusterSize(csz int) error {
if csz < 2 {
csz = 2
}
// Adjust.
// Adjust the cluster size and the number of nodes needed to establish
// a quorum.
n.csz = csz
n.qn = n.csz/2 + 1
@@ -798,7 +833,8 @@ func (n *raft) AdjustClusterSize(csz int) error {
csz = 2
}
// Adjust.
// Adjust the cluster size and the number of nodes needed to establish
// a quorum.
n.csz = csz
n.qn = n.csz/2 + 1
n.Unlock()
@@ -808,7 +844,8 @@ func (n *raft) AdjustClusterSize(csz int) error {
}
// PauseApply will allow us to pause processing of append entries onto our
// external apply chan.
// external apply queue. In effect this means that the upper layer will no longer
// receive any new entries from the Raft group.
func (n *raft) PauseApply() error {
if n.State() == Leader {
return errAlreadyLeader
@@ -832,6 +869,8 @@ func (n *raft) PauseApply() error {
return nil
}
// ResumeApply will resume sending applies to the external apply queue. This
// means that we will start sending new entries to the upper layer.
func (n *raft) ResumeApply() {
n.Lock()
defer n.Unlock()
@@ -862,8 +901,9 @@ func (n *raft) ResumeApply() {
}
}
// Applied is to be called when the FSM has applied the committed entries.
// Applied will return the number of entries and an estimation of the
// Applied is a callback that must be be called by the upper layer when it
// has successfully applied the committed entries that it received from the
// apply queue. It will return the number of entries and an estimation of the
// byte size that could be removed with a snapshot/compact.
func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
n.Lock()
@@ -878,6 +918,9 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
if index > n.applied {
n.applied = index
}
// Calculate the number of entries and estimate the byte size that
// we can now remove with a compaction/snapshot.
var state StreamState
n.wal.FastState(&state)
if n.applied > state.FirstSeq {
@@ -945,11 +988,14 @@ func (n *raft) InstallSnapshot(data []byte) error {
n.Lock()
// If a write error has occurred already then stop here.
if werr := n.werr; werr != nil {
n.Unlock()
return werr
}
// Check that a catchup isn't already taking place. If it is then we won't
// allow installing snapshots until it is done.
if len(n.progress) > 0 {
n.Unlock()
return errCatchupsRunning
@@ -967,10 +1013,13 @@ func (n *raft) InstallSnapshot(data []byte) error {
var term uint64
if ae, _ := n.loadEntry(n.applied); ae != nil {
// Use the term from the most recently applied entry if possible.
term = ae.term
} else if ae, _ = n.loadFirstEntry(); ae != nil {
// Otherwise see if we can find the term from the first entry.
term = ae.term
} else {
// Last resort is to use the last pterm that we knew of.
term = n.pterm
}
@@ -1013,6 +1062,9 @@ func (n *raft) InstallSnapshot(data []byte) error {
return nil
}
// NeedSnapshot returns true if it is necessary to try to install a snapshot, i.e.
// after we have finished recovering/replaying at startup, on a regular interval or
// as a part of cleaning up when shutting down.
func (n *raft) NeedSnapshot() bool {
n.RLock()
defer n.RUnlock()
@@ -1024,6 +1076,8 @@ const (
snapFileT = "snap.%d.%d"
)
// termAndIndexFromSnapfile tries to load the snapshot file and returns the term
// and index from that snapshot.
func termAndIndexFromSnapFile(sn string) (term, index uint64, err error) {
if sn == _EMPTY_ {
return 0, 0, errBadSnapName
@@ -1035,6 +1089,9 @@ func termAndIndexFromSnapFile(sn string) (term, index uint64, err error) {
return term, index, nil
}
// setupLastSnapshot is called at startup to try and recover the last snapshot from
// the disk if possible. We will try to recover the term, index and commit/applied
// indices and then notify the upper layer what we found. Compacts the WAL if needed.
func (n *raft) setupLastSnapshot() {
snapDir := filepath.Join(n.sd, snapshotsDir)
psnaps, err := os.ReadDir(snapDir)
@@ -1084,19 +1141,25 @@ func (n *raft) setupLastSnapshot() {
n.snapfile = latest
snap, err := n.loadLastSnapshot()
if err != nil {
// We failed to recover the last snapshot for some reason, so we will
// assume it has been corrupted and will try to delete it.
if n.snapfile != _EMPTY_ {
os.Remove(n.snapfile)
n.snapfile = _EMPTY_
}
} else {
n.pindex = snap.lastIndex
n.pterm = snap.lastTerm
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
}
return
}
// We successfully recovered the last snapshot from the disk.
// Recover state from the snapshot and then notify the upper layer.
// Compact the WAL when we're done if needed.
n.pindex = snap.lastIndex
n.pterm = snap.lastTerm
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
}
}
@@ -1162,14 +1225,18 @@ func (n *raft) Leader() bool {
return n.State() == Leader
}
// isCatchingUp returns true if a catchup is currently taking place.
func (n *raft) isCatchingUp() bool {
n.RLock()
defer n.RUnlock()
return n.catchup != nil
}
// This function may block for up to ~10ms to check
// forward progress in some cases.
// isCurrent is called from the healthchecks and returns true if we believe
// that the upper layer is current with the Raft layer, i.e. that it has applied
// all of the commits that we have given it.
// Optionally we can also check whether or not we're making forward progress if we
// aren't current, in which case this function may block for up to ~10ms to find out.
// Lock should be held.
func (n *raft) isCurrent(includeForwardProgress bool) bool {
// Check if we are closed.
@@ -1220,7 +1287,7 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {
// Otherwise, wait for a short period of time and see if we are making any
// forward progress.
if startDelta := n.commit - n.applied; startDelta > 0 {
for i := 0; i < 10; i++ { // 5ms, in 0.5ms increments
for i := 0; i < 10; i++ { // 10ms, in 1ms increments
n.Unlock()
time.Sleep(time.Millisecond)
n.Lock()
@@ -1480,9 +1547,16 @@ func (n *raft) UpdateKnownPeers(knownPeers []string) {
}
}
// ApplyQ returns the apply queue that new commits will be sent to for the
// upper layer to apply.
func (n *raft) ApplyQ() *ipQueue[*CommittedEntry] { return n.apply }
func (n *raft) LeadChangeC() <-chan bool { return n.leadc }
func (n *raft) QuitC() <-chan struct{} { return n.quit }
// LeadChangeC returns the leader change channel, notifying when the Raft
// leader role has moved.
func (n *raft) LeadChangeC() <-chan bool { return n.leadc }
// QuitC returns the quit channel, notifying when the Raft group has shut down.
func (n *raft) QuitC() <-chan struct{} { return n.quit }
func (n *raft) Created() time.Time {
n.RLock()
@@ -1679,6 +1753,10 @@ func (n *raft) resetElectWithLock(et time.Duration) {
n.Unlock()
}
// run is the top-level runner for the Raft state machine. Depending on the
// state of the node (leader, follower, candidate, observer), this will call
// through to other functions. It is expected that this function will run for
// the entire life of the Raft node once started.
func (n *raft) run() {
s := n.s
defer s.grWG.Done()
@@ -1715,9 +1793,6 @@ func (n *raft) run() {
n.runAsCandidate()
case Leader:
n.runAsLeader()
case Observer:
// TODO(dlc) - fix.
n.runAsFollower()
case Closed:
return
}
@@ -1765,7 +1840,8 @@ func (n *raft) setObserver(isObserver bool, extSt extensionState) {
n.extSt = extSt
}
// Invoked when being notified that there is something in the entryc's queue
// processAppendEntries is called by the Raft state machine when there are
// new append entries to be committed and sent to the upper state machine.
func (n *raft) processAppendEntries() {
canProcess := true
if n.isClosed() {
@@ -1776,7 +1852,8 @@ func (n *raft) processAppendEntries() {
n.debug("AppendEntry not processing inbound, no resources")
canProcess = false
}
// Always pop the entries, but check if we can process them.
// Always pop the entries, but check if we can process them. If we can't
// then the entries are effectively dropped.
aes := n.entry.pop()
if canProcess {
for _, ae := range aes {
@@ -1786,19 +1863,25 @@ func (n *raft) processAppendEntries() {
n.entry.recycle(&aes)
}
// runAsFollower is called by run and will block for as long as the node is
// running in the follower state.
func (n *raft) runAsFollower() {
for n.State() == Follower {
for {
elect := n.electTimer()
select {
case <-n.entry.ch:
// New append entries have arrived over the network.
n.processAppendEntries()
case <-n.s.quitCh:
// The server is shutting down.
n.shutdown(false)
return
case <-n.quit:
// The Raft node is shutting down.
return
case <-elect.C:
// The election timer has fired so we think it's time to call an election.
// If we are out of resources we just want to stay in this state for the moment.
if n.outOfResources() {
n.resetElectionTimeoutWithLock()
@@ -1820,17 +1903,23 @@ func (n *raft) runAsFollower() {
return
}
case <-n.votes.ch:
// We're receiving votes from the network, probably because we have only
// just stepped down and they were already in flight. Ignore them.
n.debug("Ignoring old vote response, we have stepped down")
n.votes.popOne()
case <-n.resp.ch:
// Ignore
// We're receiving append entry responses from the network, probably because
// we have only just stepped down and they were already in flight. Ignore them.
n.resp.popOne()
case <-n.reqs.ch:
// We've just received a vote request from the network.
// Because of drain() it is possible that we get nil from popOne().
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
// We've received a stepdown request, start following the new leader if
// we can.
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
@@ -1839,26 +1928,29 @@ func (n *raft) runAsFollower() {
}
}
// Pool for CommitedEntry re-use.
// Pool for CommittedEntry re-use.
var cePool = sync.Pool{
New: func() any {
return &CommittedEntry{}
},
}
// CommitEntry is handed back to the user to apply a commit to their FSM.
// CommittedEntry is handed back to the user to apply a commit to their upper layer.
type CommittedEntry struct {
Index uint64
Entries []*Entry
}
// Create a new ComittedEntry.
// Create a new CommittedEntry. When the returned entry is no longer needed, it
// should be returned to the pool by calling ReturnToPool.
func newCommittedEntry(index uint64, entries []*Entry) *CommittedEntry {
ce := cePool.Get().(*CommittedEntry)
ce.Index, ce.Entries = index, entries
return ce
}
// ReturnToPool returns the CommittedEntry to the pool, after which point it is
// no longer safe to reuse.
func (ce *CommittedEntry) ReturnToPool() {
if ce == nil {
return
@@ -1879,7 +1971,8 @@ var entryPool = sync.Pool{
},
}
// Helper to create new entries.
// Helper to create new entries. When the returned entry is no longer needed, it
// should be returned to the entryPool pool.
func newEntry(t EntryType, data []byte) *Entry {
entry := entryPool.Get().(*Entry)
entry.Type, entry.Data = t, data
@@ -1895,15 +1988,15 @@ var aePool = sync.Pool{
// appendEntry is the main struct that is used to sync raft peers.
type appendEntry struct {
leader string
term uint64
commit uint64
pterm uint64
pindex uint64
entries []*Entry
// internal use only.
reply string
sub *subscription
leader string // The leader that this append entry came from.
term uint64 // The current term, as the leader understands it.
commit uint64 // The commit index, as the leader understands it.
pterm uint64 // The previous term, for checking consistency.
pindex uint64 // The previous commit index, for checking consistency.
entries []*Entry // Entries to process.
// Below fields are for internal use only:
reply string // Reply subject to respond to once committed.
sub *subscription // The subscription that the append entry came in on.
buf []byte
}
@@ -2166,12 +2259,15 @@ func (n *raft) runAsLeader() {
// For forwarded proposals, both normal and remove peer proposals.
fsub, err := n.subscribe(psubj, n.handleForwardedProposal)
if err != nil {
n.debug("Error subscribing to forwarded proposals: %v", err)
n.warn("Error subscribing to forwarded proposals: %v", err)
n.stepdown.push(noLeader)
return
}
rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal)
if err != nil {
n.debug("Error subscribing to forwarded proposals: %v", err)
n.warn("Error subscribing to forwarded remove peer proposals: %v", err)
n.unsubscribe(fsub)
n.stepdown.push(noLeader)
return
}
@@ -2537,7 +2633,7 @@ func (n *raft) loadEntry(index uint64) (*appendEntry, error) {
return n.decodeAppendEntry(sm.msg, nil, _EMPTY_)
}
// applyCommit will update our commit index and apply the entry to the apply chan.
// applyCommit will update our commit index and apply the entry to the apply queue.
// lock should be held.
func (n *raft) applyCommit(index uint64) error {
if n.State() == Closed {
@@ -2779,7 +2875,7 @@ func (n *raft) runAsCandidate() {
// We vote for ourselves.
votes := 1
for n.State() == Candidate {
for {
elect := n.electTimer()
select {
case <-n.entry.ch:
@@ -2840,16 +2936,22 @@ func (n *raft) runAsCandidate() {
}
}
// handleAppendEntry handles an append entry from the wire.
// handleAppendEntry handles an append entry from the wire. This function
// is an internal callback from the "asubj" append entry subscription.
func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
msg = copyBytes(msg)
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil {
// Push to the new entry channel. From here one of the worker
// goroutines (runAsLeader, runAsFollower, runAsCandidate) will
// pick it up.
n.entry.push(ae)
} else {
n.warn("AppendEntry failed to be placed on internal channel: corrupt entry")
}
}
// cancelCatchup will stop an in-flight catchup by unsubscribing from the
// catchup subscription.
// Lock should be held.
func (n *raft) cancelCatchup() {
n.debug("Canceling catchup subscription since we are now up to date")
@@ -2875,6 +2977,9 @@ func (n *raft) catchupStalled() bool {
return false
}
// createCatchup will create the state needed to track a catchup as it
// runs. It then creates a unique inbox for this catchup and subscribes
// to it. The remote side will stream entries to that subject.
// Lock should be held.
func (n *raft) createCatchup(ae *appendEntry) string {
// Cleanup any old ones.
@@ -2938,7 +3043,7 @@ func (n *raft) truncateWAL(term, index uint64) {
n.term, n.pterm, n.pindex = term, term, index
}
// Reset our WAL.
// Reset our WAL. This is equivalent to truncating all data from the log.
// Lock should be held.
func (n *raft) resetWAL() {
n.truncateWAL(0, 0)
@@ -2952,7 +3057,9 @@ func (n *raft) updateLeader(newLeader string) {
}
}
// processAppendEntry will process an appendEntry.
// processAppendEntry will process an appendEntry. This is called either
// during recovery or from processAppendEntries when there are new entries
// to be committed.
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Lock()
// Don't reset here if we have been asked to assume leader position.
@@ -2991,7 +3098,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return
}
// If we received an append entry as a candidate we should convert to a follower.
// If we received an append entry as a candidate then it would appear that
// another node has taken on the leader role already, so we should convert
// to a follower of that node instead.
if n.State() == Candidate {
n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader)
if n.term < ae.term {
@@ -3004,7 +3113,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Catching up state.
catchingUp := n.catchup != nil
// Is this a new entry?
// Is this a new entry? New entries will be delivered on the append entry
// sub, rather than a catch-up sub.
isNew := sub != nil && sub == n.aesub
// Track leader directly
@@ -3204,7 +3314,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.lxfer = true
n.xferCampaign()
} else if n.paused && !n.pobserver {
// Here we can become a leader but need to wait for resume of the apply channel.
// Here we can become a leader but need to wait for resume of the apply queue.
n.lxfer = true
}
} else {
@@ -3254,9 +3364,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}
// processPeerState is called when a peer state entry is received
// over the wire or when we're updating known peers.
// Lock should be held.
func (n *raft) processPeerState(ps *peerState) {
// Update our version of peers to that of the leader.
// Update our version of peers to that of the leader. Calculate
// the number of nodes needed to establish a quorum.
n.csz = ps.clusterSize
n.qn = n.csz/2 + 1
@@ -3274,15 +3387,19 @@ func (n *raft) processPeerState(ps *peerState) {
n.writePeerState(ps)
}
// Process a response.
// processAppendEntryResponse is called when we receive an append entry
// response from another node. They will send a confirmation to tell us
// whether they successfully committed the entry or not.
func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
n.trackPeer(ar.peer)
if ar.success {
// The remote node successfully committed the append entry.
n.trackResponse(ar)
arPool.Put(ar)
} else if ar.term > n.term {
// False here and they have a higher term.
// The remote node didn't commit the append entry, it looks like
// they are on a newer term than we are. Step down.
n.Lock()
n.term = ar.term
n.vote = noVote
@@ -3293,6 +3410,8 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
n.Unlock()
arPool.Put(ar)
} else if ar.reply != _EMPTY_ {
// The remote node didn't commit the append entry and they are
// still on the same term, so let's try to catch them up.
n.catchupFollower(ar)
}
}
@@ -3308,7 +3427,8 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry {
return newAppendEntry(n.id, n.term, n.commit, n.pterm, n.pindex, entries)
}
// Determine if we should store an entry.
// Determine if we should store an entry. This stops us from storing
// heartbeat messages.
func (ae *appendEntry) shouldStore() bool {
return ae != nil && len(ae.entries) > 0
}
@@ -3621,6 +3741,7 @@ func (n *raft) fileWriter() {
case <-n.quit:
return
case <-n.wtvch:
// We've been asked to write out the term-and-vote file.
var buf [termVoteLen]byte
n.RLock()
copy(buf[0:], n.wtv)
@@ -3633,6 +3754,7 @@ func (n *raft) fileWriter() {
n.warn("Error writing term and vote file for %q: %v", n.group, err)
}
case <-n.wpsch:
// We've been asked to write out the peer state file.
n.RLock()
buf := copyBytes(n.wps)
n.RUnlock()
@@ -3656,7 +3778,7 @@ func (n *raft) writeTermVote() {
copy(buf[8:], n.vote)
b := buf[:8+len(n.vote)]
// If same as what we have we can ignore.
// If the term and vote hasn't changed then don't rewrite to disk.
if bytes.Equal(n.wtv, b) {
return
}
@@ -3734,7 +3856,8 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
vresp := &voteResponse{n.term, n.id, false}
defer n.debug("Sending a voteResponse %+v -> %q", vresp, vr.reply)
// Ignore if we are newer.
// Ignore if we are newer. This is important so that we don't accidentally process
// votes from a previous term if they were still in flight somewhere.
if vr.term < n.term {
n.Unlock()
n.sendReply(vr.reply, vresp.encode())

View File

@@ -239,6 +239,7 @@ type stream struct {
ddindex int
ddtmr *time.Timer
qch chan struct{}
mqch chan struct{}
active bool
ddloaded bool
closed bool
@@ -558,6 +559,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
mqch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
}
@@ -785,6 +787,15 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) {
}
}
func (mset *stream) monitorQuitC() <-chan struct{} {
if mset == nil {
return nil
}
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.mqch
}
func (mset *stream) updateC() <-chan struct{} {
if mset == nil {
return nil
@@ -985,14 +996,6 @@ func (mset *stream) lastSeqAndCLFS() (uint64, uint64) {
return mset.lseq, mset.getCLFS()
}
func (mset *stream) clearCLFS() uint64 {
mset.clMu.Lock()
defer mset.clMu.Unlock()
clfs := mset.clfs
mset.clfs, mset.clseq = 0, 0
return clfs
}
func (mset *stream) getCLFS() uint64 {
mset.clMu.Lock()
defer mset.clMu.Unlock()
@@ -4077,6 +4080,7 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac
var (
errLastSeqMismatch = errors.New("last sequence mismatch")
errMsgIdDuplicate = errors.New("msgid is duplicate")
errStreamClosed = errors.New("stream closed")
)
// processJetStreamMsg is where we try to actually process the stream msg.
@@ -4085,7 +4089,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
c, s, store := mset.client, mset.srv, mset.store
if mset.closed || c == nil {
mset.mu.Unlock()
return nil
return errStreamClosed
}
// Apply the input subject transform if any
@@ -4415,7 +4419,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
// Check for preAcks and the need to skip vs store.
if mset.hasAllPreAcks(seq, subject) {
mset.clearAllPreAcks(seq)
store.SkipMsg()
@@ -4907,9 +4910,28 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
accName := jsa.account.Name
jsa.mu.Unlock()
// Clean up consumers.
// Mark as closed, kick monitor and collect consumers first.
mset.mu.Lock()
mset.closed = true
// Signal to the monitor loop.
// Can't use qch here.
if mset.mqch != nil {
close(mset.mqch)
mset.mqch = nil
}
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
mset.unsubscribeToStream(true)
// Our info sub if we spun it up.
if mset.infoSub != nil {
mset.srv.sysUnsubscribe(mset.infoSub)
mset.infoSub = nil
}
// Clean up consumers.
var obs []*consumer
for _, o := range mset.consumers {
obs = append(obs, o)
@@ -4930,21 +4952,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.cancelSourceConsumer(si.iname)
}
}
// Cluster cleanup
var sa *streamAssignment
if n := mset.node; n != nil {
if deleteFlag {
n.Delete()
sa = mset.sa
} else {
if n.NeedSnapshot() {
// Attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
}
n.Stop()
}
}
mset.mu.Unlock()
isShuttingDown := js.isShuttingDown()
@@ -4961,17 +4968,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
}
mset.mu.Lock()
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
mset.unsubscribeToStream(true)
// Our info sub if we spun it up.
if mset.infoSub != nil {
mset.srv.sysUnsubscribe(mset.infoSub)
mset.infoSub = nil
}
// Send stream delete advisory after the consumers.
if deleteFlag && advisory {
mset.sendDeleteAdvisoryLocked()
@@ -4983,11 +4979,17 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.qch = nil
}
c := mset.client
mset.client = nil
if c == nil {
mset.mu.Unlock()
return nil
// Cluster cleanup
var sa *streamAssignment
if n := mset.node; n != nil {
if deleteFlag {
n.Delete()
sa = mset.sa
} else {
// Always attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
n.Stop()
}
}
// Cleanup duplicate timer if running.
@@ -5013,6 +5015,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
// Snapshot store.
store := mset.store
c := mset.client
mset.client = nil
// Clustered cleanup.
mset.mu.Unlock()
@@ -5027,7 +5031,9 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
js.mu.Unlock()
}
c.closeConnection(ClientClosed)
if c != nil {
c.closeConnection(ClientClosed)
}
if sysc != nil {
sysc.closeConnection(ClientClosed)
@@ -5042,9 +5048,12 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
js.releaseStreamResources(&mset.cfg)
// cleanup directories after the stream
accDir := filepath.Join(js.config.StoreDir, accName)
// no op if not empty
os.Remove(filepath.Join(accDir, streamsDir))
os.Remove(accDir)
// Do cleanup in separate go routine similar to how fs will use purge here..
go func() {
// no op if not empty
os.Remove(filepath.Join(accDir, streamsDir))
os.Remove(accDir)
}()
} else if store != nil {
// Ignore errors.
store.Stop()

View File

@@ -693,9 +693,9 @@ func (s *Server) wsUpgrade(w http.ResponseWriter, r *http.Request) (*wsUpgradeRe
kind := CLIENT
if r.URL != nil {
ep := r.URL.EscapedPath()
if strings.HasPrefix(ep, leafNodeWSPath) {
if strings.HasSuffix(ep, leafNodeWSPath) {
kind = LEAF
} else if strings.HasPrefix(ep, mqttWSPath) {
} else if strings.HasSuffix(ep, mqttWSPath) {
kind = MQTT
}
}

8
vendor/modules.txt vendored
View File

@@ -1371,10 +1371,10 @@ github.com/mohae/deepcopy
# github.com/mschoch/smat v0.2.0
## explicit; go 1.13
github.com/mschoch/smat
# github.com/nats-io/jwt/v2 v2.5.2
# github.com/nats-io/jwt/v2 v2.5.3
## explicit; go 1.18
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.10.4
# github.com/nats-io/nats-server/v2 v2.10.5
## explicit; go 1.20
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/ldap
@@ -2079,8 +2079,8 @@ golang.org/x/text/transform
golang.org/x/text/unicode/bidi
golang.org/x/text/unicode/norm
golang.org/x/text/width
# golang.org/x/time v0.3.0
## explicit
# golang.org/x/time v0.4.0
## explicit; go 1.18
golang.org/x/time/rate
# golang.org/x/tools v0.14.0
## explicit; go 1.18