build(deps): bump github.com/nats-io/nats-server/v2

Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.10.5 to 2.10.7.
- [Release notes](https://github.com/nats-io/nats-server/releases)
- [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml)
- [Commits](https://github.com/nats-io/nats-server/compare/v2.10.5...v2.10.7)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-12-07 06:29:57 +00:00
committed by Ralf Haferkamp
parent b7df2404ec
commit 9a3455d191
19 changed files with 749 additions and 444 deletions

4
go.mod
View File

@@ -60,7 +60,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.2.1
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.5
github.com/nats-io/nats-server/v2 v2.10.7
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.5
@@ -244,7 +244,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/juliangruber/go-intersect v1.1.0 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/libregraph/oidc-go v1.0.0 // indirect

8
go.sum
View File

@@ -1584,8 +1584,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -1742,8 +1742,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.5 h1:hhWt6m9ja/mNnm6ixc85jCthDaiUFPaeJI79K/MD980=
github.com/nats-io/nats-server/v2 v2.10.5/go.mod h1:xUMTU4kS//SDkJCSvFwN9SyJ9nUuLhSkzB/Qz0dvjjg=
github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y=
github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=

View File

@@ -142,7 +142,7 @@ Here is the overview of how we set up and use JetStream **streams**,
## JetStream API
All interactions with JetStream are performed via `mqttJSA` that sends NATS
requests to JetStream. Most are processed syncronously and await a response,
requests to JetStream. Most are processed synchronously and await a response,
some (e.g. `jsa.sendAck()`) are sent asynchronously. JetStream API is usually
referred to as `jsa` in the code. No special locking is required to use `jsa`,
however the asynchronous use of JetStream may create race conditions with

View File

@@ -336,6 +336,9 @@ var nbPoolLarge = &sync.Pool{
},
}
// nbPoolGet returns a frame that is a best-effort match for the given size.
// Once a pooled frame is no longer needed, it should be recycled by passing
// it to nbPoolPut.
func nbPoolGet(sz int) []byte {
switch {
case sz <= nbPoolSizeSmall:
@@ -347,6 +350,10 @@ func nbPoolGet(sz int) []byte {
}
}
// nbPoolPut recycles a frame that was retrieved from nbPoolGet. It is not
// safe to return multiple slices referring to chunks of the same underlying
// array as this may create overlaps when the buffers are returned to their
// original size, resulting in race conditions.
func nbPoolPut(b []byte) {
switch cap(b) {
case nbPoolSizeSmall:
@@ -1490,8 +1497,8 @@ func closedStateForErr(err error) ClosedState {
return ReadError
}
// collapsePtoNB will place primary onto nb buffer as needed in prep for WriteTo.
// This will return a copy on purpose.
// collapsePtoNB will either returned framed WebSocket buffers or it will
// return a reference to c.out.nb.
func (c *client) collapsePtoNB() (net.Buffers, int64) {
if c.isWebsocket() {
return c.wsCollapsePtoNB()
@@ -1834,11 +1841,11 @@ func (c *client) traceOutOp(op string, arg []byte) {
func (c *client) traceOp(format, op string, arg []byte) {
opa := []interface{}{}
if op != "" {
if op != _EMPTY_ {
opa = append(opa, op)
}
if arg != nil {
opa = append(opa, string(arg))
opa = append(opa, bytesToString(arg))
}
c.Tracef(format, opa)
}
@@ -2531,7 +2538,7 @@ func (c *client) processHeaderPub(arg []byte) error {
c.maxPayloadViolation(c.pa.size, maxPayload)
return ErrMaxPayload
}
if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) {
if c.opts.Pedantic && !IsValidLiteralSubject(bytesToString(c.pa.subject)) {
c.sendErr("Invalid Publish Subject")
}
return nil
@@ -2584,7 +2591,7 @@ func (c *client) processPub(arg []byte) error {
c.maxPayloadViolation(c.pa.size, maxPayload)
return ErrMaxPayload
}
if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) {
if c.opts.Pedantic && !IsValidLiteralSubject(bytesToString(c.pa.subject)) {
c.sendErr("Invalid Publish Subject")
}
return nil
@@ -2660,7 +2667,7 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw
acc := c.acc
srv := c.srv
sid := string(sub.sid)
sid := bytesToString(sub.sid)
// This check does not apply to SYSTEM or JETSTREAM or ACCOUNT clients (because they don't have a `nc`...)
if c.isClosed() && (kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT) {
@@ -2795,7 +2802,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription, enact b
acc.mu.RUnlock()
return nil
}
subj := string(sub.subject)
subj := bytesToString(sub.subject)
if len(acc.imports.streams) > 0 {
tokens = tokenizeSubjectIntoSlice(tsa[:0], subj)
for _, tk := range tokens {
@@ -2903,7 +2910,7 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri
if im.rtr == nil {
im.rtr = im.tr.reverse()
}
s := string(nsub.subject)
s := bytesToString(nsub.subject)
if ime.overlapSubj != _EMPTY_ {
s = ime.overlapSubj
}
@@ -3000,7 +3007,7 @@ func queueMatches(queue string, qsubs [][]*subscription) bool {
}
for _, qsub := range qsubs {
qs := qsub[0]
qname := string(qs.queue)
qname := bytesToString(qs.queue)
// NOTE: '*' and '>' tokens can also be valid
// queue names so we first check against the
@@ -3020,9 +3027,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
c.mu.Lock()
if !force && sub.max > 0 && sub.nm < sub.max {
c.Debugf(
"Deferring actual UNSUB(%s): %d max, %d received",
string(sub.subject), sub.max, sub.nm)
c.Debugf("Deferring actual UNSUB(%s): %d max, %d received", sub.subject, sub.max, sub.nm)
c.mu.Unlock()
return
}
@@ -3034,7 +3039,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
// Remove accounting if requested. This will be false when we close a connection
// with open subscriptions.
if remove {
delete(c.subs, string(sub.sid))
delete(c.subs, bytesToString(sub.sid))
if acc != nil {
acc.sl.Remove(sub)
}
@@ -3194,7 +3199,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac
// Remap subject if its a shadow subscription, treat like a normal client.
if rt.sub.im != nil {
if rt.sub.im.tr != nil {
to := rt.sub.im.tr.TransformSubject(string(subj))
to := rt.sub.im.tr.TransformSubject(bytesToString(subj))
subj = []byte(to)
} else if !rt.sub.im.usePub {
subj = []byte(rt.sub.im.to)
@@ -3367,7 +3372,7 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
// still process the message in hand, otherwise
// unsubscribe and drop message on the floor.
if sub.nm == sub.max {
client.Debugf("Auto-unsubscribe limit of %d reached for sid '%s'", sub.max, string(sub.sid))
client.Debugf("Auto-unsubscribe limit of %d reached for sid '%s'", sub.max, sub.sid)
// Due to defer, reverse the code order so that execution
// is consistent with other cases where we unsubscribe.
if shouldForward {
@@ -3503,7 +3508,7 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
c.addToPCD(client)
if client.trace {
client.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil)
client.traceOutOp(bytesToString(mh[:len(mh)-LEN_CR_LF]), nil)
}
client.mu.Unlock()
@@ -3699,7 +3704,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo
}
} else {
// Update our cache here.
c.perms.pcache.Store(string(subject), allowed)
c.perms.pcache.Store(subject, allowed)
if n := atomic.AddInt32(&c.perms.pcsz, 1); n > maxPermCacheSize {
c.prunePubPermsCache()
}
@@ -3711,7 +3716,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo
func isServiceReply(reply []byte) bool {
// This function is inlined and checking this way is actually faster
// than byte-by-byte comparison.
return len(reply) > 3 && string(reply[:4]) == replyPrefix
return len(reply) > 3 && bytesToString(reply[:4]) == replyPrefix
}
// Test whether a reply subject is a service import or a gateway routed reply.
@@ -3721,9 +3726,9 @@ func isReservedReply(reply []byte) bool {
}
rLen := len(reply)
// Faster to check with string([:]) than byte-by-byte
if rLen > jsAckPreLen && string(reply[:jsAckPreLen]) == jsAckPre {
if rLen > jsAckPreLen && bytesToString(reply[:jsAckPreLen]) == jsAckPre {
return true
} else if rLen > gwReplyPrefixLen && string(reply[:gwReplyPrefixLen]) == gwReplyPrefix {
} else if rLen > gwReplyPrefixLen && bytesToString(reply[:gwReplyPrefixLen]) == gwReplyPrefix {
return true
}
return false
@@ -3745,7 +3750,7 @@ func (c *client) processInboundMsg(msg []byte) {
// selectMappedSubject will choose the mapped subject based on the client's inbound subject.
func (c *client) selectMappedSubject() bool {
nsubj, changed := c.acc.selectMappedSubject(string(c.pa.subject))
nsubj, changed := c.acc.selectMappedSubject(bytesToString(c.pa.subject))
if changed {
c.pa.mapped = c.pa.subject
c.pa.subject = []byte(nsubj)
@@ -3821,7 +3826,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
c.mu.Lock()
rl := c.rrTracking.rmap[string(c.pa.subject)]
if rl != nil {
delete(c.rrTracking.rmap, string(c.pa.subject))
delete(c.rrTracking.rmap, bytesToString(c.pa.subject))
}
c.mu.Unlock()
@@ -3861,6 +3866,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
// Go back to the sublist data structure.
if !ok {
// Match may use the subject here to populate a cache, so can not use bytesToString here.
r = acc.sl.Match(string(c.pa.subject))
if len(r.psubs)+len(r.qsubs) > 0 {
c.in.results[string(c.pa.subject)] = r
@@ -4105,7 +4111,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
var checkJS bool
shouldReturn := si.invalid || acc.sl == nil
if !shouldReturn && !isResponse && si.to == jsAllAPI {
subj := string(c.pa.subject)
subj := bytesToString(c.pa.subject)
if strings.HasPrefix(subj, jsRequestNextPre) || strings.HasPrefix(subj, jsDirectGetPre) {
checkJS = true
}
@@ -4215,7 +4221,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// Set clientInfo if present.
if ci != nil {
if b, _ := json.Marshal(ci); b != nil {
msg = c.setHeader(ClientInfoHdr, string(b), msg)
msg = c.setHeader(ClientInfoHdr, bytesToString(b), msg)
}
}
}
@@ -4447,7 +4453,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
if sub.im.tr != nil {
to := sub.im.tr.TransformSubject(string(subject))
to := sub.im.tr.TransformSubject(bytesToString(subject))
dsubj = append(_dsubj[:0], to...)
} else if sub.im.usePub {
dsubj = append(_dsubj[:0], subj...)
@@ -4600,7 +4606,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
if sub.im.tr != nil {
to := sub.im.tr.TransformSubject(string(subject))
to := sub.im.tr.TransformSubject(bytesToString(subject))
dsubj = append(_dsubj[:0], to...)
} else if sub.im.usePub {
dsubj = append(_dsubj[:0], subj...)
@@ -4677,7 +4683,7 @@ sendToRoutesOrLeafs:
if dc.kind == LEAF {
// Check two scenarios. One is inbound from a route (c.pa.origin)
if c.kind == ROUTER && len(c.pa.origin) > 0 {
if string(c.pa.origin) == dc.remoteCluster() {
if bytesToString(c.pa.origin) == dc.remoteCluster() {
continue
}
}
@@ -4737,7 +4743,7 @@ func (c *client) checkLeafClientInfoHeader(msg []byte) (dmsg []byte, setHdr bool
if ci.Account != remoteAcc {
ci.Account = remoteAcc
if b, _ := json.Marshal(ci); b != nil {
dmsg, setHdr = c.setHeader(ClientInfoHdr, string(b), msg), true
dmsg, setHdr = c.setHeader(ClientInfoHdr, bytesToString(b), msg), true
}
}
}
@@ -5162,7 +5168,8 @@ func (c *client) closeConnection(reason ClosedState) {
if kind == LEAF {
num = sub.qw
}
key := string(sub.subject) + " " + string(sub.queue)
// TODO(dlc) - Better to use string builder?
key := bytesToString(sub.subject) + " " + bytesToString(sub.queue)
if esub, ok := qsubs[key]; ok {
esub.n += num
} else {
@@ -5323,7 +5330,7 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {
if genid := atomic.LoadUint64(&sl.genid); genid != pac.genid {
ok = false
delete(c.in.pacache, string(c.pa.pacache))
delete(c.in.pacache, bytesToString(c.pa.pacache))
} else {
acc = pac.acc
r = pac.results

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.10.5"
VERSION = "2.10.7"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -700,10 +700,15 @@ func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) {
mset.mu.RLock()
s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc
s, jsa, tierName, cfg, acc, closed := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc, mset.closed
retention := cfg.Retention
mset.mu.RUnlock()
// Check if this stream has closed.
if closed {
return nil, NewJSStreamInvalidError()
}
// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
// This can happen on startup with restored state where on meta replay we still do not have
// the assignment. Running in single server mode this always returns true.
@@ -812,8 +817,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, NewJSConsumerWQRequiresExplicitAckError()
}
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if len(mset.consumers) > 0 {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if len(subjects) == 0 {
mset.mu.Unlock()
return nil, NewJSConsumerWQMultipleUnfilteredError()
@@ -1790,6 +1795,10 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed || o.mset == nil {
return NewJSConsumerDoesNotExistError()
}
if err := o.acc.checkNewConsumerConfig(&o.cfg, cfg); err != nil {
return err
}
@@ -2451,8 +2460,11 @@ func (o *consumer) setStoreState(state *ConsumerState) error {
if state == nil || o.store == nil {
return nil
}
o.applyState(state)
return o.store.Update(state)
err := o.store.Update(state)
if err == nil {
o.applyState(state)
}
return err
}
// Update our state to the store.
@@ -4485,7 +4497,7 @@ func (o *consumer) checkPending() {
}
// Since we can update timestamps, we have to review all pending.
// We will now bail if we see an ack pending in bound to us via o.awl.
// We will now bail if we see an ack pending inbound to us via o.awl.
var expired []uint64
check := len(o.pending) > 1024
for seq, p := range o.pending {
@@ -5253,23 +5265,30 @@ func gatherSubjectFilters(filter string, filters []string) []string {
return filters
}
// Will check if we are running in the monitor already and if not set the appropriate flag.
func (o *consumer) checkInMonitor() bool {
// shouldStartMonitor will return true if we should start a monitor
// goroutine or will return false if one is already running.
func (o *consumer) shouldStartMonitor() bool {
o.mu.Lock()
defer o.mu.Unlock()
if o.inMonitor {
return true
return false
}
o.monitorWg.Add(1)
o.inMonitor = true
return false
return true
}
// Clear us being in the monitor routine.
// Clear the monitor running state. The monitor goroutine should
// call this in a defer to clean up on exit.
func (o *consumer) clearMonitorRunning() {
o.mu.Lock()
defer o.mu.Unlock()
o.inMonitor = false
if o.inMonitor {
o.monitorWg.Done()
o.inMonitor = false
}
}
// Test whether we are in the monitor routine.
@@ -5296,11 +5315,16 @@ func (o *consumer) checkStateForInterestStream() {
return
}
asflr := state.AckFloor.Stream
// Protect ourselves against rolling backwards.
if asflr&(1<<63) != 0 {
return
}
// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)
asflr := state.AckFloor.Stream
for seq := ss.FirstSeq; seq <= asflr; seq++ {
mset.ackMsg(o, seq)
}

View File

@@ -157,7 +157,6 @@ type psi struct {
total uint64
fblk uint32
lblk uint32
subj string
}
type fileStore struct {
@@ -439,7 +438,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return nil, err
}
// Check if our prior remember a last past where we can see.
// Check if our prior state remembers a last sequence past where we can see.
if fs.ld != nil && prior.LastSeq > fs.state.LastSeq {
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
if lmb, err := fs.newMsgBlockForWrite(); err == nil {
@@ -560,9 +559,8 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
fs.mu.Lock()
new_cfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: *cfg}
old_cfg := fs.cfg
// Messages block reference fs.cfg.Subjects (in subjString) under the
// mb's lock, not fs' lock. So do the switch here under all existing
// message blocks' lock in order to silence the DATA RACE detector.
// The reference story has changed here, so this full msg block lock
// may not be needed.
fs.lockAllMsgBlocks()
fs.cfg = new_cfg
fs.unlockAllMsgBlocks()
@@ -1407,23 +1405,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
if !mb.dmap.Exists(seq) {
mb.msgs++
mb.bytes += uint64(rl)
// Rebuild per subject info if needed.
if slen > 0 {
if mb.fss == nil {
mb.fss = make(map[string]*SimpleState)
}
// For the lookup, we cast the byte slice and there won't be any copy
if ss := mb.fss[string(data[:slen])]; ss != nil {
ss.Msgs++
ss.Last = seq
} else {
// This will either use a subject from the config, or make a copy
// so we don't reference the underlying buffer.
subj := mb.subjString(data[:slen])
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
}
}
}
// Always set last
@@ -1464,6 +1445,30 @@ func (fs *fileStore) warn(format string, args ...any) {
fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}
// Track local state but ignore timestamps here.
func updateTrackingState(state *StreamState, mb *msgBlock) {
if state.FirstSeq == 0 {
state.FirstSeq = mb.first.seq
} else if mb.first.seq < state.FirstSeq {
state.FirstSeq = mb.first.seq
}
if mb.last.seq > state.LastSeq {
state.LastSeq = mb.last.seq
}
state.Msgs += mb.msgs
state.Bytes += mb.bytes
}
// Determine if our tracking states are the same.
func trackingStatesEqual(fs, mb *StreamState) bool {
// When a fs is brand new the fs state will have first seq of 0, but tracking mb may have 1.
// If either has a first sequence that is not 0 or 1 we will check if they are the same, otherwise skip.
if fs.FirstSeq > 1 || mb.FirstSeq > 1 {
return fs.Msgs == mb.Msgs && fs.FirstSeq == mb.FirstSeq && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes
}
return fs.Msgs == mb.Msgs && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes
}
// recoverFullState will attempt to receover our last full state and re-process any state changes
// that happened afterwards.
func (fs *fileStore) recoverFullState() (rerr error) {
@@ -1581,9 +1586,12 @@ func (fs *fileStore) recoverFullState() (rerr error) {
fs.warn("Stream state bad subject len (%d)", lsubj)
return errCorruptState
}
// If we have lots of subjects this will alloc for each one.
// We could reference the underlying buffer, but we could guess wrong if
// number of blocks is large and subjects is low, since we would reference buf.
subj := string(buf[bi : bi+lsubj])
bi += lsubj
psi := &psi{total: readU64(), fblk: uint32(readU64()), subj: subj}
psi := &psi{total: readU64(), fblk: uint32(readU64())}
if psi.total > 1 {
psi.lblk = uint32(readU64())
} else {
@@ -1595,6 +1603,9 @@ func (fs *fileStore) recoverFullState() (rerr error) {
}
}
// Track the state as represented by the blocks themselves.
var mstate StreamState
if numBlocks := readU64(); numBlocks > 0 {
lastIndex := int(numBlocks - 1)
fs.blks = make([]*msgBlock, 0, numBlocks)
@@ -1626,6 +1637,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// Only add in if not empty or the lmb.
if mb.msgs > 0 || i == lastIndex {
fs.addMsgBlock(mb)
updateTrackingState(&mstate, mb)
} else {
// Mark dirty to cleanup.
fs.dirty++
@@ -1671,20 +1683,37 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since recover will add in the new one.
fs.removeMsgBlockFromList(mb)
// Reverse update of tracking state for this mb, will add new state in below.
mstate.Msgs -= mb.msgs
mstate.Bytes -= mb.bytes
if nmb, err := fs.recoverMsgBlockNoSubjectUpdates(mb.index); err != nil && !os.IsNotExist(err) {
fs.warn("Stream state could not recover last msg block")
os.Remove(fn)
return errCorruptState
} else if nmb != nil {
fs.adjustAccounting(mb, nmb)
updateTrackingState(&mstate, mb)
}
}
// On success double check our state.
checkState := func() error {
// We check first and last seq and number of msgs and bytes. If there is a difference,
// return and error so we rebuild from the message block state on disk.
if !trackingStatesEqual(&fs.state, &mstate) {
fs.warn("Stream state encountered internal inconsistency on recover")
os.Remove(fn)
return errCorruptState
}
return nil
}
// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
for bi := blkIndex + 1; ; bi++ {
nmb, err := fs.recoverMsgBlock(bi)
if err != nil {
if os.IsNotExist(err) {
return nil
return checkState()
}
os.Remove(fn)
fs.warn("Stream state could not recover msg block %d", bi)
@@ -1702,6 +1731,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
}
fs.state.Msgs += nmb.msgs
fs.state.Bytes += nmb.bytes
updateTrackingState(&mstate, nmb)
}
}
}
@@ -1739,7 +1769,7 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) {
info.lblk = nmb.index
}
} else {
fs.psim[sm.subj] = &psi{total: 1, fblk: nmb.index, lblk: nmb.index, subj: sm.subj}
fs.psim[sm.subj] = &psi{total: 1, fblk: nmb.index, lblk: nmb.index}
fs.tsl += len(sm.subj)
}
}
@@ -2864,7 +2894,6 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
if lmb := fs.lmb; lmb != nil {
index = lmb.index + 1
// Determine if we can reclaim any resources here.
if fs.fip {
lmb.mu.Lock()
@@ -2877,8 +2906,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
}
}
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects(), syncAlways: fs.fcfg.SyncAlways}
mb := fs.initMsgBlock(index)
// Lock should be held to quiet race detector.
mb.mu.Lock()
mb.setupWriteCache(rbuf)
@@ -2900,12 +2928,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
}
mb.hh = hh
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, mb.index))
mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file [%q]: %v", mb.mfn, err)
return nil, fmt.Errorf("Error creating msg block file: %v", err)
}
mb.mfd = mfd
@@ -3017,7 +3043,7 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
info.lblk = index
}
} else {
fs.psim[subj] = &psi{total: 1, fblk: index, lblk: index, subj: subj}
fs.psim[subj] = &psi{total: 1, fblk: index, lblk: index}
fs.tsl += len(subj)
}
}
@@ -3237,6 +3263,9 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
// Will check the msg limit and drop firstSeq msg if needed.
// Lock should be held.
func (fs *fileStore) enforceMsgLimit() {
if fs.cfg.Discard != DiscardOld {
return
}
if fs.cfg.MaxMsgs <= 0 || fs.state.Msgs <= uint64(fs.cfg.MaxMsgs) {
return
}
@@ -3251,6 +3280,9 @@ func (fs *fileStore) enforceMsgLimit() {
// Will check the bytes limit and drop msgs if needed.
// Lock should be held.
func (fs *fileStore) enforceBytesLimit() {
if fs.cfg.Discard != DiscardOld {
return
}
if fs.cfg.MaxBytes <= 0 || fs.state.Bytes <= uint64(fs.cfg.MaxBytes) {
return
}
@@ -3692,15 +3724,11 @@ func (mb *msgBlock) compact() {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
if !firstSet {
firstSet, fseq = true, seq
firstSet = true
atomic.StoreUint64(&mb.first.seq, seq)
}
}
}
// Always set last as long as not a tombstone.
if seq&tbit == 0 {
atomic.StoreUint64(&mb.last.seq, seq&^ebit)
}
// Advance to next record.
index += rl
}
@@ -3742,10 +3770,15 @@ func (mb *msgBlock) compact() {
return
}
// Wipe dmap and rebuild here.
mb.dmap.Empty()
mb.rebuildStateLocked()
// Capture the updated rbytes.
mb.rbytes = uint64(len(nbuf))
// Remove any seqs from the beginning of the blk.
for seq, nfseq := fseq, atomic.LoadUint64(&mb.first.seq); seq < nfseq; seq++ {
mb.dmap.Delete(seq)
}
// Make sure we clear the cache since no longer valid.
mb.clearCacheAndOffset()
// If we entered with the msgs loaded make sure to reload them.
if wasLoaded {
mb.loadMsgsWithLock()
@@ -5060,7 +5093,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
ss.Msgs++
ss.Last = seq
} else {
mb.fss[mb.subjString(bsubj)] = &SimpleState{Msgs: 1, First: seq, Last: seq}
mb.fss[string(bsubj)] = &SimpleState{
Msgs: 1,
First: seq,
Last: seq,
}
}
}
}
@@ -5626,39 +5663,14 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
sm.msg = sm.buf[0 : end-slen]
}
sm.seq, sm.ts = seq, ts
// Treat subject a bit different to not reference underlying buf.
if slen > 0 {
sm.subj = mb.subjString(data[:slen])
// Make a copy since sm.subj lifetime may last longer.
sm.subj = string(data[:slen])
}
return sm, nil
}
// Given the `key` byte slice, this function will return the subject
// as an interned string of `key` or a configured subject as to minimize memory allocations.
// We used to have a pool structure when we leaned on block fss, which could duplicate subjects.
// Now we have fs scoped PSIM that is always present and is already tracking all in-use subjects.
// Lock should be held.
func (fs *fileStore) subjString(skey []byte) string {
if fs == nil || len(skey) == 0 {
return _EMPTY_
}
if len(fs.psim) > 0 {
// Cast in place below to avoid allocation for lookup.
if psi := fs.psim[string(skey)]; psi != nil {
return psi.subj
}
}
return string(skey)
}
// Given the `key` byte slice, this function will return the subject
// as an interned string of `key` or a configured subject as to minimize memory allocations.
// Lock should be held.
func (mb *msgBlock) subjString(skey []byte) string {
return mb.fs.subjString(skey)
}
// LoadMsg will lookup the message by sequence number and return it if found.
func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
return fs.msgForSeq(seq, sm)
@@ -6316,6 +6328,8 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
smb.mu.Lock()
if atomic.LoadUint64(&smb.first.seq) == seq {
fs.state.FirstSeq = atomic.LoadUint64(&smb.first.seq)
fs.state.FirstTime = time.Unix(0, smb.first.ts).UTC()
goto SKIP
}
@@ -6758,7 +6772,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
@@ -6869,7 +6883,7 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) {
info.lblk = mb.index
}
} else {
fs.psim[subj] = &psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index, subj: subj}
fs.psim[subj] = &psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index}
fs.tsl += len(subj)
}
}
@@ -6947,7 +6961,7 @@ func (fs *fileStore) Delete() error {
// Do Purge() since if we have lots of blocks uses a mv/rename.
fs.Purge()
if err := fs.Stop(); err != nil {
if err := fs.stop(false); err != nil {
return err
}
@@ -7006,7 +7020,7 @@ const (
// This is also called during Stop().
func (fs *fileStore) flushStreamStateLoop(fch, qch, done chan struct{}) {
// Make sure we do not try to write these out too fast.
const writeThreshold = time.Second * 10
const writeThreshold = time.Minute
lastWrite := time.Time{}
// We will use these to complete the full state write while not doing them too fast.
@@ -7065,7 +7079,6 @@ func timestampNormalized(t time.Time) int64 {
// 4. Last block index and hash of record inclusive to this stream state.
func (fs *fileStore) writeFullState() error {
fs.mu.Lock()
if fs.closed || fs.dirty == 0 {
fs.mu.Unlock()
return nil
@@ -7133,6 +7146,9 @@ func (fs *fileStore) writeFullState() error {
baseTime := timestampNormalized(fs.state.FirstTime)
var scratch [8 * 1024]byte
// Track the state as represented by the mbs.
var mstate StreamState
var dmapTotalLen int
for _, mb := range fs.blks {
mb.mu.RLock()
@@ -7157,6 +7173,7 @@ func (fs *fileStore) writeFullState() error {
mb.ensureLastChecksumLoaded()
copy(lchk[0:], mb.lchk[:])
}
updateTrackingState(&mstate, mb)
mb.mu.RUnlock()
}
if dmapTotalLen > 0 {
@@ -7186,9 +7203,22 @@ func (fs *fileStore) writeFullState() error {
// Snapshot prior dirty count.
priorDirty := fs.dirty
// Check tracking state.
statesEqual := trackingStatesEqual(&fs.state, &mstate)
// Release lock.
fs.mu.Unlock()
// Check consistency here.
if !statesEqual {
fs.warn("Stream state encountered internal inconsistency on write")
// Rebuild our fs state from the mb state.
fs.rebuildState(nil)
// Make sure to reprocess.
fs.kickFlushStateLoop()
return errCorruptState
}
if cap(buf) > sz {
fs.warn("WriteFullState reallocated from %d to %d", sz, cap(buf))
}
@@ -7225,6 +7255,11 @@ func (fs *fileStore) writeFullState() error {
// Stop the current filestore.
func (fs *fileStore) Stop() error {
return fs.stop(true)
}
// Stop the current filestore.
func (fs *fileStore) stop(writeState bool) error {
fs.mu.Lock()
if fs.closed || fs.closing {
fs.mu.Unlock()
@@ -7235,7 +7270,9 @@ func (fs *fileStore) Stop() error {
// so we don't end up with this function running more than once.
fs.closing = true
fs.checkAndFlushAllBlocks()
if writeState {
fs.checkAndFlushAllBlocks()
}
fs.closeAllMsgBlocks(false)
fs.cancelSyncTimer()
@@ -7247,13 +7284,15 @@ func (fs *fileStore) Stop() error {
fs.qch = nil
}
// Wait for the state flush loop to exit.
fsld := fs.fsld
fs.mu.Unlock()
<-fsld
// Write full state if needed. If not dirty this is a no-op.
fs.writeFullState()
fs.mu.Lock()
if writeState {
// Wait for the state flush loop to exit.
fsld := fs.fsld
fs.mu.Unlock()
<-fsld
// Write full state if needed. If not dirty this is a no-op.
fs.writeFullState()
fs.mu.Lock()
}
// Mark as closed. Last message block needs to be cleared after
// writeFullState has completed.
@@ -8473,6 +8512,12 @@ func decodeConsumerState(buf []byte) (*ConsumerState, error) {
}
}
// Protect ourselves against rolling backwards.
const hbit = 1 << 63
if state.AckFloor.Stream&hbit != 0 || state.Delivered.Stream&hbit != 0 {
return nil, errCorruptState
}
// We have additional stuff.
if numPending := readLen(); numPending > 0 {
mints := readTimeStamp()
@@ -8790,4 +8835,5 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {
output = append(output, checksum...)
return output, reader.Close()
}

View File

@@ -1,4 +1,4 @@
// Copyright 2018-2020 The NATS Authors
// Copyright 2018-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1225,22 +1225,22 @@ func (s *Server) forwardNewGatewayToLocalCluster(oinfo *Info) {
// messages from the remote's outbound connection. This side is
// the one sending the subscription interest.
func (s *Server) sendQueueSubsToGateway(c *client) {
s.sendSubsToGateway(c, nil)
s.sendSubsToGateway(c, _EMPTY_)
}
// Sends all subscriptions for the given account to the remove gateway
// This is sent from the inbound side, that is, the side that receives
// messages from the remote's outbound connection. This side is
// the one sending the subscription interest.
func (s *Server) sendAccountSubsToGateway(c *client, accName []byte) {
func (s *Server) sendAccountSubsToGateway(c *client, accName string) {
s.sendSubsToGateway(c, accName)
}
func gwBuildSubProto(buf *bytes.Buffer, accName []byte, acc map[string]*sitally, doQueues bool) {
func gwBuildSubProto(buf *bytes.Buffer, accName string, acc map[string]*sitally, doQueues bool) {
for saq, si := range acc {
if doQueues && si.q || !doQueues && !si.q {
buf.Write(rSubBytes)
buf.Write(accName)
buf.WriteString(accName)
buf.WriteByte(' ')
// For queue subs (si.q is true), saq will be
// subject + ' ' + queue, for plain subs, this is
@@ -1255,7 +1255,7 @@ func gwBuildSubProto(buf *bytes.Buffer, accName []byte, acc map[string]*sitally,
}
// Sends subscriptions to remote gateway.
func (s *Server) sendSubsToGateway(c *client, accountName []byte) {
func (s *Server) sendSubsToGateway(c *client, accountName string) {
var (
bufa = [32 * 1024]byte{}
bbuf = bytes.NewBuffer(bufa[:0])
@@ -1268,22 +1268,22 @@ func (s *Server) sendSubsToGateway(c *client, accountName []byte) {
defer gw.pasi.Unlock()
// If account is specified...
if accountName != nil {
if accountName != _EMPTY_ {
// Simply send all plain subs (no queues) for this specific account
gwBuildSubProto(bbuf, accountName, gw.pasi.m[string(accountName)], false)
gwBuildSubProto(bbuf, accountName, gw.pasi.m[accountName], false)
// Instruct to send all subs (RS+/-) for this account from now on.
c.mu.Lock()
e := c.gw.insim[string(accountName)]
e := c.gw.insim[accountName]
if e == nil {
e = &insie{}
c.gw.insim[string(accountName)] = e
c.gw.insim[accountName] = e
}
e.mode = InterestOnly
c.mu.Unlock()
} else {
// Send queues for all accounts
for accName, acc := range gw.pasi.m {
gwBuildSubProto(bbuf, []byte(accName), acc, true)
gwBuildSubProto(bbuf, accName, acc, true)
}
}
@@ -1899,7 +1899,7 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
return nil
}
if e.sl.Remove(sub) == nil {
delete(c.subs, string(key))
delete(c.subs, bytesToString(key))
if queue != nil {
e.qsubs--
atomic.AddInt64(&c.srv.gateway.totalQSubs, -1)
@@ -1976,7 +1976,7 @@ func (c *client) processGatewayRSub(arg []byte) error {
}
defer c.mu.Unlock()
ei, _ := c.gw.outsim.Load(string(accName))
ei, _ := c.gw.outsim.Load(bytesToString(accName))
// We should always have an existing entry for plain subs because
// in optimistic mode we would have received RS- first, and
// in full knowledge, we are receiving RS+ for an account after
@@ -2038,7 +2038,7 @@ func (c *client) processGatewayRSub(arg []byte) error {
srv = c.srv
callUpdate = true
} else {
subj := string(subject)
subj := bytesToString(subject)
// If this is an RS+ for a wc subject, then
// remove from the no interest map all subjects
// that are a subset of this wc subject.
@@ -2149,8 +2149,8 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription
accProtoa [256]byte
accProto []byte
proto []byte
subject = string(sub.subject)
hasWc = subjectHasWildcard(subject)
subject = bytesToString(sub.subject)
hasWC = subjectHasWildcard(subject)
)
for _, c := range gws {
proto = nil
@@ -2165,7 +2165,7 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription
// For wildcard subjects, we will remove from our no-interest
// map, all subjects that are a subset of this wc subject, but we
// still send the wc subject and let the remote do its own cleanup.
if hasWc {
if hasWC {
for enis := range e.ni {
if subjectIsSubsetMatch(enis, subject) {
delete(e.ni, enis)
@@ -2337,7 +2337,7 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
} else {
entry.n += change
if entry.n <= 0 {
delete(st, string(key))
delete(st, bytesToString(key))
last = true
if len(st) == 0 {
delete(accMap, accName)
@@ -2381,7 +2381,7 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
// that is, starts with $GNR and is long enough to contain cluster/server hash
// and subject.
func isGWRoutedReply(subj []byte) bool {
return len(subj) > gwSubjectOffset && string(subj[:gwReplyPrefixLen]) == gwReplyPrefix
return len(subj) > gwSubjectOffset && bytesToString(subj[:gwReplyPrefixLen]) == gwReplyPrefix
}
// Same than isGWRoutedReply but accepts the old prefix $GR and returns
@@ -2390,7 +2390,7 @@ func isGWRoutedSubjectAndIsOldPrefix(subj []byte) (bool, bool) {
if isGWRoutedReply(subj) {
return true, false
}
if len(subj) > oldGWReplyStart && string(subj[:oldGWReplyPrefixLen]) == oldGWReplyPrefix {
if len(subj) > oldGWReplyStart && bytesToString(subj[:oldGWReplyPrefixLen]) == oldGWReplyPrefix {
return true, true
}
return false, false
@@ -2399,7 +2399,7 @@ func isGWRoutedSubjectAndIsOldPrefix(subj []byte) (bool, bool) {
// Returns true if subject starts with "$GNR.". This is to check that
// clients can't publish on this subject.
func hasGWRoutedReplyPrefix(subj []byte) bool {
return len(subj) > gwReplyPrefixLen && string(subj[:gwReplyPrefixLen]) == gwReplyPrefix
return len(subj) > gwReplyPrefixLen && bytesToString(subj[:gwReplyPrefixLen]) == gwReplyPrefix
}
// Evaluates if the given reply should be mapped or not.
@@ -2455,7 +2455,6 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
return false
}
var (
subj = string(subject)
queuesa = [512]byte{}
queues = queuesa[:0]
accName = acc.Name
@@ -2499,7 +2498,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
}
} else {
// Plain sub interest and queue sub results for this account/subject
psi, qr := gwc.gatewayInterest(accName, subj)
psi, qr := gwc.gatewayInterest(accName, string(subject))
if !psi && qr == nil {
continue
}
@@ -2760,11 +2759,11 @@ func (s *Server) removeRouteByHash(srvIDHash string) {
// Returns the route with given hash or nil if not found.
// This is for gateways only.
func (s *Server) getRouteByHash(hash, accName []byte) (*client, bool) {
id := string(hash)
id := bytesToString(hash)
var perAccount bool
if v, ok := s.accRouteByHash.Load(string(accName)); ok {
if v, ok := s.accRouteByHash.Load(bytesToString(accName)); ok {
if v == nil {
id += string(accName)
id += bytesToString(accName)
perAccount = true
} else {
id += strconv.Itoa(v.(int))
@@ -2774,7 +2773,7 @@ func (s *Server) getRouteByHash(hash, accName []byte) (*client, bool) {
return v.(*client), perAccount
} else if !perAccount {
// Check if we have a "no pool" connection at index 0.
if v, ok := s.gateway.routesIDByHash.Load(string(hash) + "0"); ok {
if v, ok := s.gateway.routesIDByHash.Load(bytesToString(hash) + "0"); ok {
if r := v.(*client); r != nil {
r.mu.Lock()
noPool := r.route.noPool
@@ -3051,13 +3050,13 @@ func (c *client) gatewayAllSubsReceiveStart(info *Info) {
// <Invoked from outbound connection's readLoop>
func (c *client) gatewayAllSubsReceiveComplete(info *Info) {
account := getAccountFromGatewayCommand(c, info, "complete")
if account == "" {
if account == _EMPTY_ {
return
}
// Done receiving all subs from remote. Set the `ni`
// map to nil so that gatewayInterest() no longer
// uses it.
ei, _ := c.gw.outsim.Load(string(account))
ei, _ := c.gw.outsim.Load(account)
if ei != nil {
e := ei.(*outsie)
// Needs locking here since `ni` is checked by
@@ -3077,7 +3076,7 @@ func getAccountFromGatewayCommand(c *client, info *Info, cmd string) string {
if info.GatewayCmdPayload == nil {
c.sendErrAndErr(fmt.Sprintf("Account absent from receive-all-subscriptions-%s command", cmd))
c.closeConnection(ProtocolViolation)
return ""
return _EMPTY_
}
return string(info.GatewayCmdPayload)
}
@@ -3113,7 +3112,7 @@ func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie, accName string) {
info := Info{
Gateway: s.gateway.name,
GatewayCmd: cmd,
GatewayCmdPayload: []byte(accName),
GatewayCmdPayload: stringToBytes(accName),
}
b, _ := json.Marshal(&info)
@@ -3138,7 +3137,7 @@ func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie, accName string) {
s.startGoRoutine(func() {
defer s.grWG.Done()
s.sendAccountSubsToGateway(c, []byte(accName))
s.sendAccountSubsToGateway(c, accName)
// Send the complete command. When the remote receives
// this, it will not send a message unless it has a
// matching sub from us.

View File

@@ -1991,9 +1991,9 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
js.mu.Unlock()
return nil
}
js.mu.Unlock()
s.Debugf("JetStream cluster creating raft group:%+v", rg)
js.mu.Unlock()
sysAcc := s.SystemAccount()
if sysAcc == nil {
@@ -4300,8 +4300,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
} else {
// Clustered consumer.
// Start our monitoring routine if needed.
if !alreadyRunning && !o.isMonitorRunning() {
o.monitorWg.Add(1)
if !alreadyRunning && o.shouldStartMonitor() {
s.startGoRoutine(
func() { js.monitorConsumer(o, ca) },
pprofLabels{
@@ -4508,19 +4507,13 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
s, n, cc := js.server(), o.raftNode(), js.cluster
defer s.grWG.Done()
defer o.monitorWg.Done()
defer o.clearMonitorRunning()
if n == nil {
s.Warnf("No RAFT group for '%s > %s > %s'", o.acc.Name, ca.Stream, ca.Name)
return
}
// Make sure only one is running.
if o.checkInMonitor() {
return
}
defer o.clearMonitorRunning()
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
// This should be below the checkInMonitor call though to avoid stopping it out
// from underneath the one that is running since it will be the same raft node.
@@ -7439,7 +7432,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.mu.RLock()
canRespond := !mset.cfg.NoAck && len(reply) > 0
name, stype := mset.cfg.Name, mset.cfg.Storage
name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store
s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs
isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed
@@ -7539,6 +7532,26 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// Some header checks can be checked pre proposal. Most can not.
if len(hdr) > 0 {
// Expected last sequence per subject.
// We can check for last sequence per subject but only if the expected seq <= lseq.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 && seq <= lseq {
var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(subject, &smv)
if sm != nil {
fseq = sm.seq
}
if err != nil || fseq != seq {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(fseq)
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq)
}
}
// Expected stream name can also be pre-checked.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
if canRespond {

View File

@@ -828,7 +828,7 @@ func (c *client) sendLeafConnect(clusterName string, headers bool) error {
sigraw, _ := kp.Sign(c.nonce)
sig := base64.RawURLEncoding.EncodeToString(sigraw)
cinfo.JWT = string(tmp)
cinfo.JWT = bytesToString(tmp)
cinfo.Sig = sig
} else if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
cinfo.User = userInfo.Username()
@@ -1039,7 +1039,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
// Remember the nonce we sent here for signatures, etc.
c.nonce = make([]byte, nonceLen)
copy(c.nonce, nonce[:])
info.Nonce = string(c.nonce)
info.Nonce = bytesToString(c.nonce)
info.CID = c.cid
proto := generateInfoJSON(info)
if !opts.LeafNode.TLSHandshakeFirst {
@@ -1363,7 +1363,7 @@ func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompre
cid := c.cid
var nonce string
if !didSolicit {
nonce = string(c.nonce)
nonce = bytesToString(c.nonce)
}
c.mu.Unlock()
@@ -1970,16 +1970,15 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
rc := c.leaf.remoteCluster
c.leaf.smap = make(map[string]int32)
for _, sub := range subs {
subj := string(sub.subject)
// Check perms regardless of role.
if !c.canSubscribe(subj) {
c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", subj, accName, accNTag)
if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
continue
}
// We ignore ourselves here.
// Also don't add the subscription if it has a origin cluster and the
// cluster name matches the one of the client we are sending to.
if c != sub.client && (sub.origin == nil || (string(sub.origin) != rc)) {
if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
count := int32(1)
if len(sub.queue) > 0 && sub.qw > 0 {
count = sub.qw
@@ -2069,7 +2068,7 @@ func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
// Capture the cluster even if its empty.
cluster := _EMPTY_
if sub.origin != nil {
cluster = string(sub.origin)
cluster = bytesToString(sub.origin)
}
// If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
@@ -2196,19 +2195,15 @@ func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
// Helper function to build the key.
func keyFromSub(sub *subscription) string {
var _rkey [1024]byte
var key []byte
var sb strings.Builder
sb.Grow(len(sub.subject) + len(sub.queue) + 1)
sb.Write(sub.subject)
if sub.queue != nil {
// Just make the key subject spc group, e.g. 'foo bar'
key = _rkey[:0]
key = append(key, sub.subject...)
key = append(key, byte(' '))
key = append(key, sub.queue...)
} else {
key = sub.subject
sb.WriteByte(' ')
sb.Write(sub.queue)
}
return string(key)
return sb.String()
}
// Lock should be held.
@@ -2281,7 +2276,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
acc := c.acc
// Check if we have a loop.
ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
if ldsPrefix && string(sub.subject) == acc.getLDSubject() {
if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
c.mu.Unlock()
c.handleLeafNodeLoop(true)
return nil
@@ -2298,11 +2293,14 @@ func (c *client) processLeafSub(argo []byte) (err error) {
}
// If we are a hub check that we can publish to this subject.
if checkPerms && subjectIsLiteral(string(sub.subject)) && !c.pubAllowedFullCheck(string(sub.subject), true, true) {
c.mu.Unlock()
c.leafSubPermViolation(sub.subject)
c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
return nil
if checkPerms {
subj := string(sub.subject)
if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
c.mu.Unlock()
c.leafSubPermViolation(sub.subject)
c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
return nil
}
}
// Check if we have a maximum on the number of subscriptions.
@@ -2324,7 +2322,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
} else {
sub.sid = arg
}
key := string(sub.sid)
key := bytesToString(sub.sid)
osub := c.subs[key]
updateGWs := false
delta := int32(1)

View File

@@ -284,7 +284,18 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
if ts <= ms.msgs[ms.state.FirstSeq].ts {
return ms.state.FirstSeq
}
last := ms.msgs[ms.state.LastSeq].ts
// LastSeq is not guaranteed to be present since last does not go backwards.
var lmsg *StoreMsg
for lseq := ms.state.LastSeq; lseq > ms.state.FirstSeq; lseq-- {
if lmsg = ms.msgs[lseq]; lmsg != nil {
break
}
}
if lmsg == nil {
return ms.state.FirstSeq
}
last := lmsg.ts
if ts == last {
return ms.state.LastSeq
}
@@ -292,7 +303,10 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
return ms.state.LastSeq + 1
}
index := sort.Search(len(ms.msgs), func(i int) bool {
return ms.msgs[uint64(i)+ms.state.FirstSeq].ts >= ts
if msg := ms.msgs[ms.state.FirstSeq+uint64(i)]; msg != nil {
return msg.ts >= ts
}
return false
})
return uint64(index) + ms.state.FirstSeq
}
@@ -550,6 +564,9 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
// Will check the msg limit and drop firstSeq msg if needed.
// Lock should be held.
func (ms *memStore) enforceMsgLimit() {
if ms.cfg.Discard != DiscardOld {
return
}
if ms.cfg.MaxMsgs <= 0 || ms.state.Msgs <= uint64(ms.cfg.MaxMsgs) {
return
}
@@ -561,6 +578,9 @@ func (ms *memStore) enforceMsgLimit() {
// Will check the bytes limit and drop msgs if needed.
// Lock should be held.
func (ms *memStore) enforceBytesLimit() {
if ms.cfg.Discard != DiscardOld {
return
}
if ms.cfg.MaxBytes <= 0 || ms.state.Bytes <= uint64(ms.cfg.MaxBytes) {
return
}

View File

@@ -1159,68 +1159,68 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
// Varz will output server information on the monitoring port at /varz.
type Varz struct {
ID string `json:"server_id"`
Name string `json:"server_name"`
Version string `json:"version"`
Proto int `json:"proto"`
GitCommit string `json:"git_commit,omitempty"`
GoVersion string `json:"go"`
Host string `json:"host"`
Port int `json:"port"`
AuthRequired bool `json:"auth_required,omitempty"`
TLSRequired bool `json:"tls_required,omitempty"`
TLSVerify bool `json:"tls_verify,omitempty"`
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
IP string `json:"ip,omitempty"`
ClientConnectURLs []string `json:"connect_urls,omitempty"`
WSConnectURLs []string `json:"ws_connect_urls,omitempty"`
MaxConn int `json:"max_connections"`
MaxSubs int `json:"max_subscriptions,omitempty"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPBasePath string `json:"http_base_path"`
HTTPSPort int `json:"https_port"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int32 `json:"max_control_line"`
MaxPayload int `json:"max_payload"`
MaxPending int64 `json:"max_pending"`
Cluster ClusterOptsVarz `json:"cluster,omitempty"`
Gateway GatewayOptsVarz `json:"gateway,omitempty"`
LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"`
MQTT MQTTOptsVarz `json:"mqtt,omitempty"`
Websocket WebsocketOptsVarz `json:"websocket,omitempty"`
JetStream JetStreamVarz `json:"jetstream,omitempty"`
TLSTimeout float64 `json:"tls_timeout"`
WriteDeadline time.Duration `json:"write_deadline"`
Start time.Time `json:"start"`
Now time.Time `json:"now"`
Uptime string `json:"uptime"`
Mem int64 `json:"mem"`
Cores int `json:"cores"`
MaxProcs int `json:"gomaxprocs"`
CPU float64 `json:"cpu"`
Connections int `json:"connections"`
TotalConnections uint64 `json:"total_connections"`
Routes int `json:"routes"`
Remotes int `json:"remotes"`
Leafs int `json:"leafnodes"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
SlowConsumers int64 `json:"slow_consumers"`
Subscriptions uint32 `json:"subscriptions"`
HTTPReqStats map[string]uint64 `json:"http_req_stats"`
ConfigLoadTime time.Time `json:"config_load_time"`
Tags jwt.TagList `json:"tags,omitempty"`
TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"`
TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"`
SystemAccount string `json:"system_account,omitempty"`
PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"`
OCSPResponseCache OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"`
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"`
ID string `json:"server_id"`
Name string `json:"server_name"`
Version string `json:"version"`
Proto int `json:"proto"`
GitCommit string `json:"git_commit,omitempty"`
GoVersion string `json:"go"`
Host string `json:"host"`
Port int `json:"port"`
AuthRequired bool `json:"auth_required,omitempty"`
TLSRequired bool `json:"tls_required,omitempty"`
TLSVerify bool `json:"tls_verify,omitempty"`
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"`
IP string `json:"ip,omitempty"`
ClientConnectURLs []string `json:"connect_urls,omitempty"`
WSConnectURLs []string `json:"ws_connect_urls,omitempty"`
MaxConn int `json:"max_connections"`
MaxSubs int `json:"max_subscriptions,omitempty"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPBasePath string `json:"http_base_path"`
HTTPSPort int `json:"https_port"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int32 `json:"max_control_line"`
MaxPayload int `json:"max_payload"`
MaxPending int64 `json:"max_pending"`
Cluster ClusterOptsVarz `json:"cluster,omitempty"`
Gateway GatewayOptsVarz `json:"gateway,omitempty"`
LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"`
MQTT MQTTOptsVarz `json:"mqtt,omitempty"`
Websocket WebsocketOptsVarz `json:"websocket,omitempty"`
JetStream JetStreamVarz `json:"jetstream,omitempty"`
TLSTimeout float64 `json:"tls_timeout"`
WriteDeadline time.Duration `json:"write_deadline"`
Start time.Time `json:"start"`
Now time.Time `json:"now"`
Uptime string `json:"uptime"`
Mem int64 `json:"mem"`
Cores int `json:"cores"`
MaxProcs int `json:"gomaxprocs"`
CPU float64 `json:"cpu"`
Connections int `json:"connections"`
TotalConnections uint64 `json:"total_connections"`
Routes int `json:"routes"`
Remotes int `json:"remotes"`
Leafs int `json:"leafnodes"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
SlowConsumers int64 `json:"slow_consumers"`
Subscriptions uint32 `json:"subscriptions"`
HTTPReqStats map[string]uint64 `json:"http_req_stats"`
ConfigLoadTime time.Time `json:"config_load_time"`
Tags jwt.TagList `json:"tags,omitempty"`
TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"`
TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"`
SystemAccount string `json:"system_account,omitempty"`
PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"`
OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"`
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"`
}
// JetStreamVarz contains basic runtime information about jetstream
@@ -1767,7 +1767,7 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64
if s.ocsprc != nil && s.ocsprc.Type() != "none" {
stats := s.ocsprc.Stats()
if stats != nil {
v.OCSPResponseCache = OCSPResponseCacheVarz{
v.OCSPResponseCache = &OCSPResponseCacheVarz{
s.ocsprc.Type(),
stats.Hits,
stats.Misses,

View File

@@ -183,6 +183,11 @@ const (
// For Websocket URLs
mqttWSPath = "/mqtt"
mqttInitialPubHeader = 16 // An overkill, should need 7 bytes max
mqttProcessSubTooLong = 100 * time.Millisecond
mqttRetainedCacheTTL = 2 * time.Minute
mqttRetainedTransferTimeout = 10 * time.Second
)
var (
@@ -237,6 +242,7 @@ type mqttAccountSessionManager struct {
flapTimer *time.Timer // Timer to perform some cleanup of the flappers map
sl *Sublist // sublist allowing to find retained messages for given subscription
retmsgs map[string]*mqttRetainedMsgRef // retained messages
rmsCache sync.Map // map[string(subject)]mqttRetainedMsg
jsa mqttJSA
rrmLastSeq uint64 // Restore retained messages expected last sequence
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded
@@ -269,7 +275,13 @@ type mqttRetMsgDel struct {
}
type mqttSession struct {
mu sync.Mutex
// subsMu is a "quick" version of the session lock, sufficient for the QoS0
// callback. It only guarantees that a new subscription is initialized, and
// its retained messages if any have been queued up for delivery. The QoS12
// callback uses the session lock.
mu sync.Mutex
subsMu sync.RWMutex
id string // client ID
idHash string // client ID hash
c *client
@@ -321,6 +333,8 @@ type mqttRetainedMsg struct {
Msg []byte `json:"msg,omitempty"`
Flags byte `json:"flags,omitempty"`
Source string `json:"source,omitempty"`
expiresFromCache time.Time
}
type mqttRetainedMsgRef struct {
@@ -329,13 +343,25 @@ type mqttRetainedMsgRef struct {
sub *subscription
}
// mqttSub contains fields associated with a MQTT subscription, and is added to
// the main subscription struct for MQTT message delivery subscriptions. The
// delivery callbacks may get invoked before sub.mqtt is set up, so they should
// acquire either sess.mu or sess.subsMu before accessing it.
type mqttSub struct {
qos byte
// Pending serialization of retained messages to be sent when subscription is registered
prm *mqttWriter
// This is the JS durable name this subscription is attached to.
// The sub's QOS and the JS durable name. They can change when
// re-subscribing, and are used in the delivery callbacks. They can be
// quickly accessed using sess.subsMu.RLock, or under the main session lock.
qos byte
jsDur string
// If this subscription needs to be checked for being reserved. E.g. # or * or */
// Pending serialization of retained messages to be sent when subscription
// is registered. The sub's delivery callbacks must wait until `prm` is
// ready (can block on sess.mu for that, too).
prm [][]byte
// If this subscription needs to be checked for being reserved. E.g. '#' or
// '*' or '*/'. It is set up at the time of subscription and is immutable
// after that.
reserved bool
}
@@ -1090,7 +1116,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
c := s.createInternalAccountClient()
c.acc = acc
id := getHash(s.Name())
id := s.NodeName()
replicas := opts.MQTT.StreamReplicas
if replicas <= 0 {
replicas = s.mqttDetermineReplicas()
@@ -1203,6 +1229,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
as.sendJSAPIrequests(s, c, accName, closeCh)
})
// Start the go routine that will clean up cached retained messages that expired.
s.startGoRoutine(func() {
defer s.grWG.Done()
as.cleaupRetainedMessageCache(s, closeCh)
})
lookupStream := func(stream, txt string) (*StreamInfo, error) {
si, err := jsa.lookupStream(stream)
if err != nil {
@@ -1301,10 +1333,13 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
}
// This is the only case where we need "si" after lookup/create
needToTransfer := true
si, err := lookupStream(mqttRetainedMsgsStreamName, "retained messages")
if err != nil {
switch {
case err != nil:
return nil, err
} else if si == nil {
case si == nil:
// Create the stream for retained messages.
cfg := &StreamConfig{
Name: mqttRetainedMsgsStreamName,
@@ -1327,7 +1362,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
return nil, err
}
}
needToTransfer = false
default:
needToTransfer = si.Config.MaxMsgsPer != 1
}
// Doing this check outside of above if/else due to possible race when
// creating the stream.
wantedSubj := mqttRetainedMsgsStreamSubject + ">"
@@ -1338,30 +1378,49 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
return nil, fmt.Errorf("failed to update stream config: %w", err)
}
}
// Try to transfer regardless if we have already updated the stream or not
// in case not all messages were transferred and the server was restarted.
if as.transferRetainedToPerKeySubjectStream(s) {
transferRMS := func() error {
if !needToTransfer {
return nil
}
as.transferRetainedToPerKeySubjectStream(s)
// We need another lookup to have up-to-date si.State values in order
// to load all retained messages.
si, err = lookupStream(mqttRetainedMsgsStreamName, "retained messages")
if err != nil {
return nil, err
return err
}
needToTransfer = false
return nil
}
// Attempt to transfer all "single subject" retained messages to new
// subjects. It may fail, will log its own error; ignore it the first time
// and proceed to updating MaxMsgsPer. Then we invoke transferRMS() again,
// which will get another chance to resolve the error; if not we bail there.
if err = transferRMS(); err != nil {
return nil, err
}
// Now, if the stream does not have MaxMsgsPer set to 1, and there are no
// more messages on the single $MQTT.rmsgs subject, update the stream again.
if si.Config.MaxMsgsPer != 1 {
_, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs")
// Looking for an error indicated that there is no such message.
if err != nil && IsNatsErr(err, JSNoMessageFoundErr) {
si.Config.MaxMsgsPer = 1
// We will need an up-to-date si, so don't use local variable here.
if si, err = jsa.updateStream(&si.Config); err != nil {
return nil, fmt.Errorf("failed to update stream config: %w", err)
}
si.Config.MaxMsgsPer = 1
// We will need an up-to-date si, so don't use local variable here.
if si, err = jsa.updateStream(&si.Config); err != nil {
return nil, fmt.Errorf("failed to update stream config: %w", err)
}
}
// If we failed the first time, there is now at most one lingering message
// in the old subject. Try again (it will be a NO-OP if succeeded the first
// time).
if err = transferRMS(); err != nil {
return nil, err
}
var lastSeq uint64
var rmDoneCh chan struct{}
st := si.State
@@ -1466,20 +1525,21 @@ func (jsa *mqttJSA) prefixDomain(subject string) string {
func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []byte, timeout time.Duration) (interface{}, error) {
var sb strings.Builder
jsa.mu.Lock()
// Either we use nuid.Next() which uses a global lock, or our own nuid object, but
// then it needs to be "write" protected. This approach will reduce across account
// contention since we won't use the global nuid's lock.
jsa.mu.Lock()
uid := jsa.nuid.Next()
sb.WriteString(jsa.rplyr)
jsa.mu.Unlock()
sb.WriteString(kind)
sb.WriteByte(btsep)
if cidHash != _EMPTY_ {
sb.WriteString(cidHash)
sb.WriteByte(btsep)
}
sb.WriteString(jsa.nuid.Next())
jsa.mu.Unlock()
sb.WriteString(uid)
reply := sb.String()
ch := make(chan interface{}, 1)
@@ -1797,9 +1857,7 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie
seq, _, _ := ackReplyInfo(reply)
// Handle this retained message
rf := &mqttRetainedMsgRef{}
rf.sseq = seq
as.handleRetainedMsg(rm.Subject, rf)
as.handleRetainedMsg(rm.Subject, &mqttRetainedMsgRef{sseq: seq}, rm)
// If we were recovering (lastSeq > 0), then check if we are done.
if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq {
@@ -1937,6 +1995,40 @@ func (as *mqttAccountSessionManager) createSubscription(subject string, cb msgHa
return nil
}
// A timer loop to cleanup up expired cached retained messages for a given MQTT account.
// The closeCh is used by the caller to be able to interrupt this routine
// if the rest of the initialization fails, since the quitCh is really
// only used when the server shutdown.
//
// No lock held on entry.
func (as *mqttAccountSessionManager) cleaupRetainedMessageCache(s *Server, closeCh chan struct{}) {
tt := time.NewTicker(mqttRetainedCacheTTL)
defer tt.Stop()
for {
select {
case <-tt.C:
// Set a limit to the number of retained messages to scan since we
// lock as for it. Since the map enumeration gives random order we
// should eventually clean up everything.
i, maxScan := 0, 10*1000
now := time.Now()
as.rmsCache.Range(func(key, value interface{}) bool {
rm := value.(mqttRetainedMsg)
if now.After(rm.expiresFromCache) {
as.rmsCache.Delete(key)
}
i++
return i < maxScan
})
case <-closeCh:
return
case <-s.quitCh:
return
}
}
}
// Loop to send JS API requests for a given MQTT account.
// The closeCh is used by the caller to be able to interrupt this routine
// if the rest of the initialization fails, since the quitCh is really
@@ -2031,7 +2123,7 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc
// If a message for this topic already existed, the existing record is updated
// with the provided information.
// Lock not held on entry.
func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsgRef) {
func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rf *mqttRetainedMsgRef, rm *mqttRetainedMsg) {
as.mu.Lock()
defer as.mu.Unlock()
if as.retmsgs == nil {
@@ -2042,11 +2134,11 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai
if erm, exists := as.retmsgs[key]; exists {
// If the new sequence is below the floor or the existing one,
// then ignore the new one.
if rm.sseq <= erm.sseq || rm.sseq <= erm.floor {
if rf.sseq <= erm.sseq || rf.sseq <= erm.floor {
return
}
// Capture existing sequence number so we can return it as the old sequence.
erm.sseq = rm.sseq
erm.sseq = rf.sseq
// Clear the floor
erm.floor = 0
// If sub is nil, it means that it was removed from sublist following a
@@ -2055,12 +2147,23 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai
erm.sub = &subscription{subject: []byte(key)}
as.sl.Insert(erm.sub)
}
// Update the in-memory retained message cache but only for messages
// that are already in the cache, i.e. have been (recently) used.
if rm != nil {
if _, ok := as.rmsCache.Load(key); ok {
toStore := *rm
toStore.expiresFromCache = time.Now().Add(mqttRetainedCacheTTL)
as.rmsCache.Store(key, toStore)
}
}
return
}
}
rm.sub = &subscription{subject: []byte(key)}
as.retmsgs[key] = rm
as.sl.Insert(rm.sub)
rf.sub = &subscription{subject: []byte(key)}
as.retmsgs[key] = rf
as.sl.Insert(rf.sub)
}
// Removes the retained message for the given `subject` if present, and returns the
@@ -2077,6 +2180,7 @@ func (as *mqttAccountSessionManager) handleRetainedMsgDel(subject string, seq ui
as.sl = NewSublistWithCache()
}
if erm, ok := as.retmsgs[subject]; ok {
as.rmsCache.Delete(subject)
if erm.sub != nil {
as.sl.Remove(erm.sub)
erm.sub = nil
@@ -2164,10 +2268,39 @@ func (as *mqttAccountSessionManager) removeSession(sess *mqttSession, lock bool)
}
}
// Helpers that sets the sub's mqtt fields and possibly serialize
// (pre-loaded) retained messages.
// Session lock held on entry.
func (sess *mqttSession) processSub(c *client, subject, sid []byte, isReserved bool, qos byte, jsDurName string, h msgHandler, initShadow bool) (*subscription, error) {
// Helper to set the sub's mqtt fields and possibly serialize (pre-loaded)
// retained messages.
//
// Session lock held on entry. Acquires the subs lock and holds it for
// the duration. Non-MQTT messages coming into mqttDeliverMsgCbQoS0 will be
// waiting.
func (sess *mqttSession) processSub(
// subscribing client.
c *client,
// subscription parameters.
subject, sid []byte, isReserved bool, qos byte, jsDurName string, h msgHandler,
// do we need to scan for shadow subscriptions? (we don't do it for QOS1+)
initShadow bool,
// len(rms) > 0 means to deliver retained messages for the subscription.
rms map[string]*mqttRetainedMsg,
// trace serialized retained messages in the log.
trace bool,
// the retained messages are kept in the account session manager.
as *mqttAccountSessionManager,
) (*subscription, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
if elapsed > mqttProcessSubTooLong {
c.Warnf("Took too long to process subscription for %q: %v", subject, elapsed)
}
}()
// Hold subsMu to prevent QOS0 messages callback from doing anything until
// the (MQTT) sub is initialized.
sess.subsMu.Lock()
defer sess.subsMu.Unlock()
sub, err := c.processSub(subject, nil, sid, h, false)
if err != nil {
// c.processSub already called c.Errorf(), so no need here.
@@ -2179,12 +2312,24 @@ func (sess *mqttSession) processSub(c *client, subject, sid []byte, isReserved b
}
for _, ss := range subs {
if ss.mqtt == nil {
ss.mqtt = &mqttSub{}
// reserved is set only once and once the subscription has been
// created it can be considered immutable.
ss.mqtt = &mqttSub{
reserved: isReserved,
}
}
// QOS and jsDurName can be changed on an existing subscription, so
// accessing it later requires a lock.
ss.mqtt.qos = qos
ss.mqtt.reserved = isReserved
ss.mqtt.jsDur = jsDurName
}
if len(rms) > 0 {
for _, ss := range subs {
as.serializeRetainedMsgsForSub(rms, sess, c, ss, trace)
}
}
return sub, nil
}
@@ -2297,12 +2442,6 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
sess.cons[sid] = cc
}
serializeRMS := func(sub *subscription) {
for _, ss := range append([]*subscription{sub}, sub.shadow...) {
as.serializeRetainedMsgsForSub(rms, sess, c, ss, trace)
}
}
var err error
subs := make([]*subscription, 0, len(filters))
for _, f := range filters {
@@ -2315,21 +2454,28 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
bsubject := []byte(subject)
sid := subject
bsid := bsubject
isReserved := isMQTTReservedSubscription(subject)
var jscons *ConsumerConfig
var jssub *subscription
// Note that if a subscription already exists on this subject, the
// existing sub is returned. Need to update the qos.
var sub *subscription
var err error
const processShadowSubs = true
as.mu.Lock()
sess.mu.Lock()
sub, err := sess.processSub(c, bsubject, bsid,
isMQTTReservedSubscription(subject), f.qos, _EMPTY_, mqttDeliverMsgCbQoS0, true)
if err == nil && fromSubProto {
serializeRMS(sub)
}
sub, err = sess.processSub(c,
bsubject, bsid, isReserved, f.qos, // main subject
_EMPTY_, mqttDeliverMsgCbQoS0, // no jsDur for QOS0
processShadowSubs,
rms, trace, as) // rms is empty if not fromSubProto
sess.mu.Unlock()
as.mu.Unlock()
if err != nil {
f.qos = mqttSubAckFailure
sess.cleanupFailedSub(c, sub, jscons, jssub)
@@ -2356,11 +2502,11 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
// See note above about existing subscription.
as.mu.Lock()
sess.mu.Lock()
fwcsub, err = sess.processSub(c, []byte(fwcsubject), []byte(fwcsid),
isMQTTReservedSubscription(subject), f.qos, _EMPTY_, mqttDeliverMsgCbQoS0, true)
if err == nil && fromSubProto {
serializeRMS(fwcsub)
}
fwcsub, err = sess.processSub(c,
[]byte(fwcsubject), []byte(fwcsid), isReserved, f.qos, // FWC (top-level wildcard) subject
_EMPTY_, mqttDeliverMsgCbQoS0, // no jsDur for QOS0
processShadowSubs,
rms, trace, as) // rms is empty if not fromSubProto
sess.mu.Unlock()
as.mu.Unlock()
if err != nil {
@@ -2411,14 +2557,13 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(rms map[string]
return
}
for _, psub := range result.psubs {
rm, ok := rms[string(psub.subject)]
if !ok {
rm := rms[string(psub.subject)]
if rm == nil {
// This should not happen since we pre-load messages into the cache
// before calling serialize.
continue
}
if sub.mqtt.prm == nil {
sub.mqtt.prm = &mqttWriter{}
}
prm := sub.mqtt.prm
var pi uint16
qos := mqttGetQoS(rm.Flags)
if qos > sub.mqtt.qos {
@@ -2441,7 +2586,10 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(rms map[string]
// Need to use the subject for the retained message, not the `sub` subject.
// We can find the published retained message in rm.sub.subject.
// Set the RETAIN flag: [MQTT-3.3.1-8].
flags := mqttSerializePublishMsg(prm, pi, qos, false, true, []byte(rm.Topic), rm.Msg)
flags, headerBytes := mqttMakePublishHeader(pi, qos, false, true, []byte(rm.Topic), len(rm.Msg))
c.mu.Lock()
sub.mqtt.prm = append(sub.mqtt.prm, headerBytes, rm.Msg)
c.mu.Unlock()
if trace {
pp := mqttPublish{
topic: []byte(rm.Topic),
@@ -2475,6 +2623,16 @@ func (as *mqttAccountSessionManager) loadRetainedMessagesForSubject(rms map[stri
if rms[subject] != nil {
continue // already loaded
}
// See if we have the retained message in the cache.
if rmv, _ := as.rmsCache.Load(subject); rmv != nil {
rm := rmv.(mqttRetainedMsg)
rms[subject] = &rm
continue
}
// Load the retained message from the stream, and cache it for reuse in
// the near future.
loadSubject := mqttRetainedMsgsStreamSubject + subject
jsm, err := as.jsa.loadLastMsgFor(mqttRetainedMsgsStreamName, loadSubject)
if err != nil || jsm == nil {
@@ -2486,6 +2644,10 @@ func (as *mqttAccountSessionManager) loadRetainedMessagesForSubject(rms map[stri
log.Warnf("failed to decode retained message for subject %q: %v", loadSubject, err)
continue
}
// Add the loaded retained message to the cache.
rm.expiresFromCache = time.Now().Add(mqttRetainedCacheTTL)
as.rmsCache.Store(subject, rm)
rms[subject] = &rm
}
}
@@ -2616,55 +2778,59 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve
retry = false
}
func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) bool {
func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) error {
jsa := &as.jsa
var count, errors int
var processed int
var transferred int
start := time.Now()
deadline := start.Add(mqttRetainedTransferTimeout)
for {
// Try and look up messages on the original undivided "$MQTT.rmsgs" subject.
// If nothing is returned here, we assume to have migrated all old messages.
smsg, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs")
if err != nil {
if IsNatsErr(err, JSNoMessageFoundErr) {
// We've ran out of messages to transfer so give up.
break
}
log.Warnf(" Unable to load retained message from '$MQTT.rmsgs': %s", err)
errors++
if IsNatsErr(err, JSNoMessageFoundErr) {
// We've ran out of messages to transfer, done.
break
}
if err != nil {
log.Warnf(" Unable to transfer a retained message: failed to load from '$MQTT.rmsgs': %s", err)
return err
}
// Unmarshal the message so that we can obtain the subject name.
var rmsg mqttRetainedMsg
if err := json.Unmarshal(smsg.Data, &rmsg); err != nil {
if err = json.Unmarshal(smsg.Data, &rmsg); err == nil {
// Store the message again, this time with the new per-key subject.
subject := mqttRetainedMsgsStreamSubject + rmsg.Subject
if _, err = jsa.storeMsg(subject, 0, smsg.Data); err != nil {
log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err)
}
transferred++
} else {
log.Warnf(" Unable to unmarshal retained message with sequence %d, skipping", smsg.Sequence)
errors++
continue
}
// Store the message again, this time with the new per-key subject.
subject := mqttRetainedMsgsStreamSubject + rmsg.Subject
if _, err := jsa.storeMsg(subject, 0, smsg.Data); err != nil {
log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err)
errors++
continue
}
// Delete the original message.
if err := jsa.deleteMsg(mqttRetainedMsgsStreamName, smsg.Sequence, true); err != nil {
log.Errorf(" Unable to clean up the retained message with sequence %d: %v", smsg.Sequence, err)
errors++
continue
return err
}
processed++
now := time.Now()
if now.After(deadline) {
err := fmt.Errorf("timed out while transferring retained messages from '$MQTT.rmsgs' after %v, %d processed, %d successfully transferred", now.Sub(start), processed, transferred)
log.Noticef(err.Error())
return err
}
count++
}
if errors > 0 {
next := mqttDefaultTransferRetry
log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next)
time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) })
} else if count > 0 {
log.Noticef("Transfer of %d MQTT retained messages done!", count)
if processed > 0 {
log.Noticef("Processed %d messages from '$MQTT.rmsgs', successfully transferred %d in %v", processed, transferred, time.Since(start))
} else {
log.Debugf("No messages found to transfer from '$MQTT.rmsgs'")
}
// Signal if there was any activity (either some transferred or some errors)
return errors > 0 || count > 0
return nil
}
//////////////////////////////////////////////////////////////////////////////
@@ -3848,7 +4014,7 @@ func (c *client) mqttHandlePubRetain() {
Origin: asm.jsa.id,
Subject: key,
Topic: string(pp.topic),
Msg: copyBytes(pp.msg),
Msg: pp.msg,
Flags: pp.flags,
Source: c.opts.Username,
}
@@ -3860,7 +4026,7 @@ func (c *client) mqttHandlePubRetain() {
sseq: smr.Sequence,
}
// Add/update the map
asm.handleRetainedMsg(key, rf)
asm.handleRetainedMsg(key, rf, rm)
} else {
c.mu.Lock()
acc := c.acc
@@ -4016,28 +4182,6 @@ func pubAllowed(perms *perm, subject string) bool {
return allowed
}
func mqttWritePublish(w *mqttWriter, qos byte, dup, retain bool, subject string, pi uint16, payload []byte) {
flags := qos << 1
if dup {
flags |= mqttPubFlagDup
}
if retain {
flags |= mqttPubFlagRetain
}
w.WriteByte(mqttPacketPub | flags)
pkLen := 2 + len(subject) + len(payload)
if qos > 0 {
pkLen += 2
}
w.WriteVarInt(pkLen)
w.WriteString(subject)
if qos > 0 {
w.WriteUint16(pi)
}
w.Write([]byte(payload))
}
func (c *client) mqttEnqueuePubResponse(packetType byte, pi uint16, trace bool) {
proto := [4]byte{packetType, 0x2, 0, 0}
proto[2] = byte(pi >> 8)
@@ -4271,32 +4415,32 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
return
}
hdr, msg := pc.msgParts(rmsg)
// This is the client associated with the subscription.
cc := sub.client
// This is immutable
sess := cc.mqtt.sess
// Check the subscription's QoS. This needs to be protected because
// the client may change an existing subscription at any time.
sess.mu.Lock()
// Lock here, otherwise we may be called with sub.mqtt == nil. Ignore
// wildcard subscriptions if this subject starts with '$', per Spec
// [MQTT-4.7.2-1].
sess.subsMu.RLock()
subQoS := sub.mqtt.qos
isReservedSub := mqttIsReservedSub(sub, subject)
sess.mu.Unlock()
ignore := mqttMustIgnoreForReservedSub(sub, subject)
sess.subsMu.RUnlock()
// We have a wildcard subscription and this subject starts with '$' so ignore per Spec [MQTT-4.7.2-1].
if isReservedSub {
if ignore {
return
}
hdr, msg := pc.msgParts(rmsg)
var topic []byte
if pc.isMqtt() {
// This is an MQTT publisher directly connected to this server.
// If the message was published with a QoS > 0 and the sub has the QoS >
// 0 then the message will be delivered by the other callback.
// Check the subscription's QoS. If the message was published with a
// QoS>0 and the sub has the QoS>0 then the message will be delivered by
// mqttDeliverMsgCbQoS12.
msgQoS := mqttGetQoS(pc.mqtt.pp.flags)
if subQoS > 0 && msgQoS > 0 {
return
@@ -4310,8 +4454,13 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
} else {
// Non MQTT client, could be NATS publisher, or ROUTER, etc..
h := mqttParsePublishNATSHeader(hdr)
// If the message does not have the MQTT header, it is not a MQTT and
// should be delivered here, at QOS0. If it does have the header, we
// need to lock the session to check the sub QoS, and then ignore the
// message if the Sub wants higher QOS delivery. It will be delivered by
// mqttDeliverMsgCbQoS12.
if subQoS > 0 && h != nil && h.qos > 0 {
// will be delivered by the JetStream callback
return
}
@@ -4356,8 +4505,9 @@ func mqttDeliverMsgCbQoS12(sub *subscription, pc *client, _ *Account, subject, r
// This is immutable
sess := cc.mqtt.sess
// We lock to check some of the subscription's fields and if we need to
// keep track of pending acks, etc..
// We lock to check some of the subscription's fields and if we need to keep
// track of pending acks, etc. There is no need to acquire the subsMu RLock
// since sess.Lock is overarching for modifying subscriptions.
sess.mu.Lock()
if sess.c != cc || sub.mqtt == nil {
sess.mu.Unlock()
@@ -4382,8 +4532,7 @@ func mqttDeliverMsgCbQoS12(sub *subscription, pc *client, _ *Account, subject, r
// Check for reserved subject violation. If so, we will send the ack to
// remove the message, and do nothing else.
strippedSubj := string(subject[len(mqttStreamSubjectPrefix):])
isReservedSub := mqttIsReservedSub(sub, strippedSubj)
if isReservedSub {
if mqttMustIgnoreForReservedSub(sub, strippedSubj) {
sess.mu.Unlock()
sess.jsa.sendAck(reply)
return
@@ -4432,12 +4581,12 @@ func mqttDeliverPubRelCb(sub *subscription, pc *client, _ *Account, subject, rep
cc.mqttEnqueuePubResponse(mqttPacketPubRel, pi, trace)
}
// The MQTT Server MUST NOT match Topic Filters starting with a wildcard character (# or +)
// with Topic Names beginning with a $ character, Spec [MQTT-4.7.2-1].
// We will return true if there is a violation.
// The MQTT Server MUST NOT match Topic Filters starting with a wildcard
// character (# or +) with Topic Names beginning with a $ character, Spec
// [MQTT-4.7.2-1]. We will return true if there is a violation.
//
// Session lock must be held on entry to protect access to sub.mqtt.reserved.
func mqttIsReservedSub(sub *subscription, subject string) bool {
// Session or subMu lock must be held on entry to protect access to sub.mqtt.
func mqttMustIgnoreForReservedSub(sub *subscription, subject string) bool {
// If the subject does not start with $ nothing to do here.
if !sub.mqtt.reserved || len(subject) == 0 || subject[0] != mqttReservedPre {
return false
@@ -4460,17 +4609,17 @@ func isMQTTReservedSubscription(subject string) bool {
// Common function to mqtt delivery callbacks to serialize and send the message
// to the `cc` client.
func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup bool, topic, msg []byte) {
sw := mqttWriter{}
w := &sw
flags := mqttSerializePublishMsg(w, pi, qos, dup, false, topic, msg)
flags, headerBytes := mqttMakePublishHeader(pi, qos, dup, false, topic, len(msg))
cc.mu.Lock()
if sub.mqtt.prm != nil {
cc.queueOutbound(sub.mqtt.prm.Bytes())
for _, data := range sub.mqtt.prm {
cc.queueOutbound(data)
}
sub.mqtt.prm = nil
}
cc.queueOutbound(w.Bytes())
cc.queueOutbound(headerBytes)
cc.queueOutbound(msg)
c.addToPCD(cc)
trace := cc.trace
cc.mu.Unlock()
@@ -4487,10 +4636,9 @@ func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint1
}
// Serializes to the given writer the message for the given subject.
func mqttSerializePublishMsg(w *mqttWriter, pi uint16, qos byte, dup, retained bool, topic, msg []byte) byte {
func (w *mqttWriter) WritePublishHeader(pi uint16, qos byte, dup, retained bool, topic []byte, msgLen int) byte {
// Compute len (will have to add packet id if message is sent as QoS>=1)
pkLen := 2 + len(topic) + len(msg)
pkLen := 2 + len(topic) + msgLen
var flags byte
// Set flags for dup/retained/qos1
@@ -4500,8 +4648,7 @@ func mqttSerializePublishMsg(w *mqttWriter, pi uint16, qos byte, dup, retained b
if retained {
flags |= mqttPubFlagRetain
}
// For now, we have only QoS 1
if pi > 0 {
if qos > 0 {
pkLen += 2
flags |= qos << 1
}
@@ -4509,14 +4656,20 @@ func mqttSerializePublishMsg(w *mqttWriter, pi uint16, qos byte, dup, retained b
w.WriteByte(mqttPacketPub | flags)
w.WriteVarInt(pkLen)
w.WriteBytes(topic)
if pi > 0 {
if qos > 0 {
w.WriteUint16(pi)
}
w.Write(msg)
return flags
}
// Serializes to the given writer the message for the given subject.
func mqttMakePublishHeader(pi uint16, qos byte, dup, retained bool, topic []byte, msgLen int) (byte, []byte) {
headerBuf := newMQTTWriter(mqttInitialPubHeader + len(topic))
flags := headerBuf.WritePublishHeader(pi, qos, dup, retained, topic, msgLen)
return flags, headerBuf.Bytes()
}
// Process the SUBSCRIBE packet.
//
// Process the list of subscriptions and update the given filter
@@ -4741,7 +4894,8 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string,
sess.mu.Lock()
sess.tmaxack = tmaxack
sub, err := sess.processSub(c, []byte(inbox), []byte(inbox),
isMQTTReservedSubscription(subject), qos, cc.Durable, mqttDeliverMsgCbQoS12, false)
isMQTTReservedSubscription(subject), qos, cc.Durable, mqttDeliverMsgCbQoS12,
false, nil, false, nil) // no shadow subs, no retained message delivery
sess.mu.Unlock()
if err != nil {
@@ -4758,7 +4912,9 @@ func (c *client) mqttSendRetainedMsgsToNewSubs(subs []*subscription) {
c.mu.Lock()
for _, sub := range subs {
if sub.mqtt != nil && sub.mqtt.prm != nil {
c.queueOutbound(sub.mqtt.prm.Bytes())
for _, data := range sub.mqtt.prm {
c.queueOutbound(data)
}
sub.mqtt.prm = nil
}
}
@@ -4767,7 +4923,7 @@ func (c *client) mqttSendRetainedMsgsToNewSubs(subs []*subscription) {
}
func (c *client) mqttEnqueueSubAck(pi uint16, filters []*mqttFilter) {
w := &mqttWriter{}
w := newMQTTWriter(7 + len(filters))
w.WriteByte(mqttPacketSubAck)
// packet length is 2 (for packet identifier) and 1 byte per filter.
w.WriteVarInt(2 + len(filters))
@@ -4849,7 +5005,7 @@ func (c *client) mqttProcessUnsubs(filters []*mqttFilter) error {
}
func (c *client) mqttEnqueueUnsubAck(pi uint16) {
w := &mqttWriter{}
w := newMQTTWriter(4)
w.WriteByte(mqttPacketUnsubAck)
w.WriteVarInt(2)
w.WriteUint16(pi)
@@ -5190,3 +5346,9 @@ func (w *mqttWriter) WriteVarInt(value int) {
}
}
}
func newMQTTWriter(cap int) *mqttWriter {
w := &mqttWriter{}
w.Grow(cap)
return w
}

View File

@@ -3190,7 +3190,13 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var success bool
if eae, _ := n.loadEntry(ae.pindex); eae == nil {
n.resetWAL()
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
success = true
} else {
n.resetWAL()
}
} else {
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.

View File

@@ -146,27 +146,25 @@ func (c *client) removeReplySub(sub *subscription) {
// Lookup the account based on sub.sid.
if i := bytes.Index(sub.sid, []byte(" ")); i > 0 {
// First part of SID for route is account name.
if v, ok := c.srv.accounts.Load(string(sub.sid[:i])); ok {
if v, ok := c.srv.accounts.Load(bytesToString(sub.sid[:i])); ok {
(v.(*Account)).sl.Remove(sub)
}
c.mu.Lock()
delete(c.subs, string(sub.sid))
delete(c.subs, bytesToString(sub.sid))
c.mu.Unlock()
}
}
func (c *client) processAccountSub(arg []byte) error {
accName := string(arg)
if c.kind == GATEWAY {
return c.processGatewayAccountSub(accName)
return c.processGatewayAccountSub(string(arg))
}
return nil
}
func (c *client) processAccountUnsub(arg []byte) {
accName := string(arg)
if c.kind == GATEWAY {
c.processGatewayAccountUnsub(accName)
c.processGatewayAccountUnsub(string(arg))
}
}
@@ -705,7 +703,7 @@ func (c *client) processRouteInfo(info *Info) {
// First INFO, check if this server is configured for compression because
// if that is the case, we need to negotiate it with the remote server.
if needsCompression(opts.Cluster.Compression.Mode) {
accName := string(c.route.accName)
accName := bytesToString(c.route.accName)
// If we did not yet negotiate...
if !c.flags.isSet(compressionNegotiated) {
// Prevent from getting back here.
@@ -935,7 +933,7 @@ func (s *Server) updateRemoteRoutePerms(c *client, info *Info) {
c.opts.Import = info.Import
c.opts.Export = info.Export
routeAcc, poolIdx, noPool := string(c.route.accName), c.route.poolIdx, c.route.noPool
routeAcc, poolIdx, noPool := bytesToString(c.route.accName), c.route.poolIdx, c.route.noPool
c.mu.Unlock()
var (
@@ -1290,9 +1288,9 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) {
// RS- will have the arg exactly as the key.
var key string
if c.kind == ROUTER && c.route != nil && len(c.route.accName) > 0 {
key = accountName + " " + string(arg)
key = accountName + " " + bytesToString(arg)
} else {
key = string(arg)
key = bytesToString(arg)
}
sub, ok := c.subs[key]
if ok {
@@ -1367,7 +1365,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
// If the account name is empty (not a "per-account" route), the account
// is at the index prior to the subject.
if accountName == _EMPTY_ {
accountName = string(args[subjIdx-1])
accountName = bytesToString(args[subjIdx-1])
}
// Lookup account while avoiding fetch.
// A slow fetch delays subsequent remote messages. It also avoids the expired check (see below).
@@ -1413,7 +1411,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
}
// Check permissions if applicable.
if !c.canExport(string(sub.subject)) {
if c.perms != nil && !c.canExport(string(sub.subject)) {
c.mu.Unlock()
c.Debugf("Can not export %q, ignoring remote subscription request", sub.subject)
return nil
@@ -1451,7 +1449,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
sub.sid = append(sub.sid, ' ')
sub.sid = append(sub.sid, sub.subject...)
}
key := string(sub.sid)
key := bytesToString(sub.sid)
acc.mu.RLock()
// For routes (this can be called by leafnodes), check if the account is
@@ -2232,6 +2230,9 @@ func handleDuplicateRoute(remote, c *client, setNoReconnect bool) {
// Import filter check.
func (c *client) importFilter(sub *subscription) bool {
if c.perms == nil {
return true
}
return c.canImport(string(sub.subject))
}
@@ -2854,7 +2855,7 @@ func (s *Server) removeRoute(c *client) {
idHash = r.idHash
gwURL = r.gatewayURL
poolIdx = r.poolIdx
accName = string(r.accName)
accName = bytesToString(r.accName)
if r.noPool {
s.routesNoPool--
noPool = true

View File

@@ -21,6 +21,7 @@ import (
"io"
"strings"
"time"
"unsafe"
"github.com/nats-io/nats-server/v2/server/avl"
)
@@ -714,3 +715,23 @@ func (sm *StoreMsg) clear() {
sm.buf = sm.buf[:0]
}
}
// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer.
// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same.
func bytesToString(b []byte) string {
if len(b) == 0 {
return _EMPTY_
}
p := unsafe.SliceData(b)
return unsafe.String(p, len(b))
}
// Same in reverse. Used less often.
func stringToBytes(s string) []byte {
if len(s) == 0 {
return nil
}
p := unsafe.StringData(s)
b := unsafe.Slice(p, len(s))
return b
}

View File

@@ -5190,6 +5190,11 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) {
mset.clsMu.Lock()
o.mu.Lock()
if o.closed || o.mset == nil {
o.mu.Unlock()
return
}
if o.sigSubs != nil {
if mset.csl != nil {
for _, sub := range o.sigSubs {
@@ -5206,13 +5211,13 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) {
// If no filters are preset, add fwcs to sublist for that consumer.
if newFilters == nil {
sub := &subscription{subject: []byte(fwcs), icb: o.processStreamSignal}
o.mset.csl.Insert(sub)
mset.csl.Insert(sub)
o.sigSubs = append(o.sigSubs, sub)
// If there are filters, add their subjects to sublist.
} else {
for _, filter := range newFilters {
sub := &subscription{subject: []byte(filter), icb: o.processStreamSignal}
o.mset.csl.Insert(sub)
mset.csl.Insert(sub)
o.sigSubs = append(o.sigSubs, sub)
}
}

View File

@@ -1269,13 +1269,12 @@ func (s *Server) createWSClient(conn net.Conn, ws *websocket) *client {
}
func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
var nb net.Buffers
nb := c.out.nb
var mfs int
var usz int
if c.ws.browser {
mfs = wsFrameSizeForBrowsers
}
nb = c.out.nb
mask := c.ws.maskwrite
// Start with possible already framed buffers (that we could have
// got from partials or control messages such as ws pings or pongs).
@@ -1378,8 +1377,10 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
for i := 0; i < len(nb); i++ {
b := nb[i]
if total+len(b) <= mfs {
bufs = append(bufs, b)
buf := nbPoolGet(len(b))
bufs = append(bufs, append(buf, b...))
total += len(b)
nbPoolPut(nb[i])
continue
}
for len(b) > 0 {
@@ -1394,9 +1395,11 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if endStart {
fhIdx = startFrame()
}
bufs = append(bufs, b[:total])
buf := nbPoolGet(total)
bufs = append(bufs, append(buf, b[:total]...))
b = b[total:]
}
nbPoolPut(nb[i]) // No longer needed as copied into smaller frames.
}
if total > 0 {
endFrame(fhIdx, total)

6
vendor/modules.txt vendored
View File

@@ -1219,8 +1219,8 @@ github.com/justinas/alice
# github.com/kevinburke/ssh_config v1.2.0
## explicit
github.com/kevinburke/ssh_config
# github.com/klauspost/compress v1.17.2
## explicit; go 1.18
# github.com/klauspost/compress v1.17.4
## explicit; go 1.19
github.com/klauspost/compress/flate
github.com/klauspost/compress/s2
# github.com/klauspost/cpuid/v2 v2.1.0
@@ -1376,7 +1376,7 @@ github.com/mschoch/smat
# github.com/nats-io/jwt/v2 v2.5.3
## explicit; go 1.18
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.10.5
# github.com/nats-io/nats-server/v2 v2.10.7
## explicit; go 1.20
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/ldap