diff --git a/go.mod b/go.mod index 981b9d650c..39d360a19c 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.2.1 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.10.5 + github.com/nats-io/nats-server/v2 v2.10.7 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 github.com/onsi/ginkgo v1.16.5 @@ -244,7 +244,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/juliangruber/go-intersect v1.1.0 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/cpuid/v2 v2.1.0 // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/libregraph/oidc-go v1.0.0 // indirect diff --git a/go.sum b/go.sum index 7eaf61547c..9db2d10fea 100644 --- a/go.sum +++ b/go.sum @@ -1584,8 +1584,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -1742,8 +1742,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= -github.com/nats-io/nats-server/v2 v2.10.5 h1:hhWt6m9ja/mNnm6ixc85jCthDaiUFPaeJI79K/MD980= -github.com/nats-io/nats-server/v2 v2.10.5/go.mod h1:xUMTU4kS//SDkJCSvFwN9SyJ9nUuLhSkzB/Qz0dvjjg= +github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y= +github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c= github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= diff --git a/vendor/github.com/nats-io/nats-server/v2/server/README-MQTT.md b/vendor/github.com/nats-io/nats-server/v2/server/README-MQTT.md index e61a0991e8..cac3933d5f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/README-MQTT.md +++ b/vendor/github.com/nats-io/nats-server/v2/server/README-MQTT.md @@ -142,7 +142,7 @@ Here is the overview of how we set up and use JetStream **streams**, ## JetStream API All interactions with JetStream are performed via `mqttJSA` that sends NATS -requests to JetStream. Most are processed syncronously and await a response, +requests to JetStream. Most are processed synchronously and await a response, some (e.g. `jsa.sendAck()`) are sent asynchronously. JetStream API is usually referred to as `jsa` in the code. No special locking is required to use `jsa`, however the asynchronous use of JetStream may create race conditions with diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go index 72346e6958..5fabe77c41 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -336,6 +336,9 @@ var nbPoolLarge = &sync.Pool{ }, } +// nbPoolGet returns a frame that is a best-effort match for the given size. +// Once a pooled frame is no longer needed, it should be recycled by passing +// it to nbPoolPut. func nbPoolGet(sz int) []byte { switch { case sz <= nbPoolSizeSmall: @@ -347,6 +350,10 @@ func nbPoolGet(sz int) []byte { } } +// nbPoolPut recycles a frame that was retrieved from nbPoolGet. It is not +// safe to return multiple slices referring to chunks of the same underlying +// array as this may create overlaps when the buffers are returned to their +// original size, resulting in race conditions. func nbPoolPut(b []byte) { switch cap(b) { case nbPoolSizeSmall: @@ -1490,8 +1497,8 @@ func closedStateForErr(err error) ClosedState { return ReadError } -// collapsePtoNB will place primary onto nb buffer as needed in prep for WriteTo. -// This will return a copy on purpose. +// collapsePtoNB will either returned framed WebSocket buffers or it will +// return a reference to c.out.nb. func (c *client) collapsePtoNB() (net.Buffers, int64) { if c.isWebsocket() { return c.wsCollapsePtoNB() @@ -1834,11 +1841,11 @@ func (c *client) traceOutOp(op string, arg []byte) { func (c *client) traceOp(format, op string, arg []byte) { opa := []interface{}{} - if op != "" { + if op != _EMPTY_ { opa = append(opa, op) } if arg != nil { - opa = append(opa, string(arg)) + opa = append(opa, bytesToString(arg)) } c.Tracef(format, opa) } @@ -2531,7 +2538,7 @@ func (c *client) processHeaderPub(arg []byte) error { c.maxPayloadViolation(c.pa.size, maxPayload) return ErrMaxPayload } - if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) { + if c.opts.Pedantic && !IsValidLiteralSubject(bytesToString(c.pa.subject)) { c.sendErr("Invalid Publish Subject") } return nil @@ -2584,7 +2591,7 @@ func (c *client) processPub(arg []byte) error { c.maxPayloadViolation(c.pa.size, maxPayload) return ErrMaxPayload } - if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) { + if c.opts.Pedantic && !IsValidLiteralSubject(bytesToString(c.pa.subject)) { c.sendErr("Invalid Publish Subject") } return nil @@ -2660,7 +2667,7 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw acc := c.acc srv := c.srv - sid := string(sub.sid) + sid := bytesToString(sub.sid) // This check does not apply to SYSTEM or JETSTREAM or ACCOUNT clients (because they don't have a `nc`...) if c.isClosed() && (kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT) { @@ -2795,7 +2802,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription, enact b acc.mu.RUnlock() return nil } - subj := string(sub.subject) + subj := bytesToString(sub.subject) if len(acc.imports.streams) > 0 { tokens = tokenizeSubjectIntoSlice(tsa[:0], subj) for _, tk := range tokens { @@ -2903,7 +2910,7 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri if im.rtr == nil { im.rtr = im.tr.reverse() } - s := string(nsub.subject) + s := bytesToString(nsub.subject) if ime.overlapSubj != _EMPTY_ { s = ime.overlapSubj } @@ -3000,7 +3007,7 @@ func queueMatches(queue string, qsubs [][]*subscription) bool { } for _, qsub := range qsubs { qs := qsub[0] - qname := string(qs.queue) + qname := bytesToString(qs.queue) // NOTE: '*' and '>' tokens can also be valid // queue names so we first check against the @@ -3020,9 +3027,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool c.mu.Lock() if !force && sub.max > 0 && sub.nm < sub.max { - c.Debugf( - "Deferring actual UNSUB(%s): %d max, %d received", - string(sub.subject), sub.max, sub.nm) + c.Debugf("Deferring actual UNSUB(%s): %d max, %d received", sub.subject, sub.max, sub.nm) c.mu.Unlock() return } @@ -3034,7 +3039,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool // Remove accounting if requested. This will be false when we close a connection // with open subscriptions. if remove { - delete(c.subs, string(sub.sid)) + delete(c.subs, bytesToString(sub.sid)) if acc != nil { acc.sl.Remove(sub) } @@ -3194,7 +3199,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac // Remap subject if its a shadow subscription, treat like a normal client. if rt.sub.im != nil { if rt.sub.im.tr != nil { - to := rt.sub.im.tr.TransformSubject(string(subj)) + to := rt.sub.im.tr.TransformSubject(bytesToString(subj)) subj = []byte(to) } else if !rt.sub.im.usePub { subj = []byte(rt.sub.im.to) @@ -3367,7 +3372,7 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su // still process the message in hand, otherwise // unsubscribe and drop message on the floor. if sub.nm == sub.max { - client.Debugf("Auto-unsubscribe limit of %d reached for sid '%s'", sub.max, string(sub.sid)) + client.Debugf("Auto-unsubscribe limit of %d reached for sid '%s'", sub.max, sub.sid) // Due to defer, reverse the code order so that execution // is consistent with other cases where we unsubscribe. if shouldForward { @@ -3503,7 +3508,7 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su c.addToPCD(client) if client.trace { - client.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil) + client.traceOutOp(bytesToString(mh[:len(mh)-LEN_CR_LF]), nil) } client.mu.Unlock() @@ -3699,7 +3704,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo } } else { // Update our cache here. - c.perms.pcache.Store(string(subject), allowed) + c.perms.pcache.Store(subject, allowed) if n := atomic.AddInt32(&c.perms.pcsz, 1); n > maxPermCacheSize { c.prunePubPermsCache() } @@ -3711,7 +3716,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo func isServiceReply(reply []byte) bool { // This function is inlined and checking this way is actually faster // than byte-by-byte comparison. - return len(reply) > 3 && string(reply[:4]) == replyPrefix + return len(reply) > 3 && bytesToString(reply[:4]) == replyPrefix } // Test whether a reply subject is a service import or a gateway routed reply. @@ -3721,9 +3726,9 @@ func isReservedReply(reply []byte) bool { } rLen := len(reply) // Faster to check with string([:]) than byte-by-byte - if rLen > jsAckPreLen && string(reply[:jsAckPreLen]) == jsAckPre { + if rLen > jsAckPreLen && bytesToString(reply[:jsAckPreLen]) == jsAckPre { return true - } else if rLen > gwReplyPrefixLen && string(reply[:gwReplyPrefixLen]) == gwReplyPrefix { + } else if rLen > gwReplyPrefixLen && bytesToString(reply[:gwReplyPrefixLen]) == gwReplyPrefix { return true } return false @@ -3745,7 +3750,7 @@ func (c *client) processInboundMsg(msg []byte) { // selectMappedSubject will choose the mapped subject based on the client's inbound subject. func (c *client) selectMappedSubject() bool { - nsubj, changed := c.acc.selectMappedSubject(string(c.pa.subject)) + nsubj, changed := c.acc.selectMappedSubject(bytesToString(c.pa.subject)) if changed { c.pa.mapped = c.pa.subject c.pa.subject = []byte(nsubj) @@ -3821,7 +3826,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { c.mu.Lock() rl := c.rrTracking.rmap[string(c.pa.subject)] if rl != nil { - delete(c.rrTracking.rmap, string(c.pa.subject)) + delete(c.rrTracking.rmap, bytesToString(c.pa.subject)) } c.mu.Unlock() @@ -3861,6 +3866,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { // Go back to the sublist data structure. if !ok { + // Match may use the subject here to populate a cache, so can not use bytesToString here. r = acc.sl.Match(string(c.pa.subject)) if len(r.psubs)+len(r.qsubs) > 0 { c.in.results[string(c.pa.subject)] = r @@ -4105,7 +4111,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt var checkJS bool shouldReturn := si.invalid || acc.sl == nil if !shouldReturn && !isResponse && si.to == jsAllAPI { - subj := string(c.pa.subject) + subj := bytesToString(c.pa.subject) if strings.HasPrefix(subj, jsRequestNextPre) || strings.HasPrefix(subj, jsDirectGetPre) { checkJS = true } @@ -4215,7 +4221,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // Set clientInfo if present. if ci != nil { if b, _ := json.Marshal(ci); b != nil { - msg = c.setHeader(ClientInfoHdr, string(b), msg) + msg = c.setHeader(ClientInfoHdr, bytesToString(b), msg) } } } @@ -4447,7 +4453,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, continue } if sub.im.tr != nil { - to := sub.im.tr.TransformSubject(string(subject)) + to := sub.im.tr.TransformSubject(bytesToString(subject)) dsubj = append(_dsubj[:0], to...) } else if sub.im.usePub { dsubj = append(_dsubj[:0], subj...) @@ -4600,7 +4606,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, continue } if sub.im.tr != nil { - to := sub.im.tr.TransformSubject(string(subject)) + to := sub.im.tr.TransformSubject(bytesToString(subject)) dsubj = append(_dsubj[:0], to...) } else if sub.im.usePub { dsubj = append(_dsubj[:0], subj...) @@ -4677,7 +4683,7 @@ sendToRoutesOrLeafs: if dc.kind == LEAF { // Check two scenarios. One is inbound from a route (c.pa.origin) if c.kind == ROUTER && len(c.pa.origin) > 0 { - if string(c.pa.origin) == dc.remoteCluster() { + if bytesToString(c.pa.origin) == dc.remoteCluster() { continue } } @@ -4737,7 +4743,7 @@ func (c *client) checkLeafClientInfoHeader(msg []byte) (dmsg []byte, setHdr bool if ci.Account != remoteAcc { ci.Account = remoteAcc if b, _ := json.Marshal(ci); b != nil { - dmsg, setHdr = c.setHeader(ClientInfoHdr, string(b), msg), true + dmsg, setHdr = c.setHeader(ClientInfoHdr, bytesToString(b), msg), true } } } @@ -5162,7 +5168,8 @@ func (c *client) closeConnection(reason ClosedState) { if kind == LEAF { num = sub.qw } - key := string(sub.subject) + " " + string(sub.queue) + // TODO(dlc) - Better to use string builder? + key := bytesToString(sub.subject) + " " + bytesToString(sub.queue) if esub, ok := qsubs[key]; ok { esub.n += num } else { @@ -5323,7 +5330,7 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { if genid := atomic.LoadUint64(&sl.genid); genid != pac.genid { ok = false - delete(c.in.pacache, string(c.pa.pacache)) + delete(c.in.pacache, bytesToString(c.pa.pacache)) } else { acc = pac.acc r = pac.results diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index 4295a4304c..39cc294cb7 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.10.5" + VERSION = "2.10.7" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index 6953abf518..f71184bcfa 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -700,10 +700,15 @@ func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) { mset.mu.RLock() - s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc + s, jsa, tierName, cfg, acc, closed := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc, mset.closed retention := cfg.Retention mset.mu.RUnlock() + // Check if this stream has closed. + if closed { + return nil, NewJSStreamInvalidError() + } + // If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn. // This can happen on startup with restored state where on meta replay we still do not have // the assignment. Running in single server mode this always returns true. @@ -812,8 +817,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerWQRequiresExplicitAckError() } - subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) if len(mset.consumers) > 0 { + subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) if len(subjects) == 0 { mset.mu.Unlock() return nil, NewJSConsumerWQMultipleUnfilteredError() @@ -1790,6 +1795,10 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { o.mu.Lock() defer o.mu.Unlock() + if o.closed || o.mset == nil { + return NewJSConsumerDoesNotExistError() + } + if err := o.acc.checkNewConsumerConfig(&o.cfg, cfg); err != nil { return err } @@ -2451,8 +2460,11 @@ func (o *consumer) setStoreState(state *ConsumerState) error { if state == nil || o.store == nil { return nil } - o.applyState(state) - return o.store.Update(state) + err := o.store.Update(state) + if err == nil { + o.applyState(state) + } + return err } // Update our state to the store. @@ -4485,7 +4497,7 @@ func (o *consumer) checkPending() { } // Since we can update timestamps, we have to review all pending. - // We will now bail if we see an ack pending in bound to us via o.awl. + // We will now bail if we see an ack pending inbound to us via o.awl. var expired []uint64 check := len(o.pending) > 1024 for seq, p := range o.pending { @@ -5253,23 +5265,30 @@ func gatherSubjectFilters(filter string, filters []string) []string { return filters } -// Will check if we are running in the monitor already and if not set the appropriate flag. -func (o *consumer) checkInMonitor() bool { +// shouldStartMonitor will return true if we should start a monitor +// goroutine or will return false if one is already running. +func (o *consumer) shouldStartMonitor() bool { o.mu.Lock() defer o.mu.Unlock() if o.inMonitor { - return true + return false } + o.monitorWg.Add(1) o.inMonitor = true - return false + return true } -// Clear us being in the monitor routine. +// Clear the monitor running state. The monitor goroutine should +// call this in a defer to clean up on exit. func (o *consumer) clearMonitorRunning() { o.mu.Lock() defer o.mu.Unlock() - o.inMonitor = false + + if o.inMonitor { + o.monitorWg.Done() + o.inMonitor = false + } } // Test whether we are in the monitor routine. @@ -5296,11 +5315,16 @@ func (o *consumer) checkStateForInterestStream() { return } + asflr := state.AckFloor.Stream + // Protect ourselves against rolling backwards. + if asflr&(1<<63) != 0 { + return + } + // We should make sure to update the acks. var ss StreamState mset.store.FastState(&ss) - asflr := state.AckFloor.Stream for seq := ss.FirstSeq; seq <= asflr; seq++ { mset.ackMsg(o, seq) } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index 52af94fdb8..573edb29d8 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -157,7 +157,6 @@ type psi struct { total uint64 fblk uint32 lblk uint32 - subj string } type fileStore struct { @@ -439,7 +438,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim return nil, err } - // Check if our prior remember a last past where we can see. + // Check if our prior state remembers a last sequence past where we can see. if fs.ld != nil && prior.LastSeq > fs.state.LastSeq { fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime if lmb, err := fs.newMsgBlockForWrite(); err == nil { @@ -560,9 +559,8 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { fs.mu.Lock() new_cfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: *cfg} old_cfg := fs.cfg - // Messages block reference fs.cfg.Subjects (in subjString) under the - // mb's lock, not fs' lock. So do the switch here under all existing - // message blocks' lock in order to silence the DATA RACE detector. + // The reference story has changed here, so this full msg block lock + // may not be needed. fs.lockAllMsgBlocks() fs.cfg = new_cfg fs.unlockAllMsgBlocks() @@ -1407,23 +1405,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { if !mb.dmap.Exists(seq) { mb.msgs++ mb.bytes += uint64(rl) - - // Rebuild per subject info if needed. - if slen > 0 { - if mb.fss == nil { - mb.fss = make(map[string]*SimpleState) - } - // For the lookup, we cast the byte slice and there won't be any copy - if ss := mb.fss[string(data[:slen])]; ss != nil { - ss.Msgs++ - ss.Last = seq - } else { - // This will either use a subject from the config, or make a copy - // so we don't reference the underlying buffer. - subj := mb.subjString(data[:slen]) - mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} - } - } } // Always set last @@ -1464,6 +1445,30 @@ func (fs *fileStore) warn(format string, args ...any) { fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...) } +// Track local state but ignore timestamps here. +func updateTrackingState(state *StreamState, mb *msgBlock) { + if state.FirstSeq == 0 { + state.FirstSeq = mb.first.seq + } else if mb.first.seq < state.FirstSeq { + state.FirstSeq = mb.first.seq + } + if mb.last.seq > state.LastSeq { + state.LastSeq = mb.last.seq + } + state.Msgs += mb.msgs + state.Bytes += mb.bytes +} + +// Determine if our tracking states are the same. +func trackingStatesEqual(fs, mb *StreamState) bool { + // When a fs is brand new the fs state will have first seq of 0, but tracking mb may have 1. + // If either has a first sequence that is not 0 or 1 we will check if they are the same, otherwise skip. + if fs.FirstSeq > 1 || mb.FirstSeq > 1 { + return fs.Msgs == mb.Msgs && fs.FirstSeq == mb.FirstSeq && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes + } + return fs.Msgs == mb.Msgs && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes +} + // recoverFullState will attempt to receover our last full state and re-process any state changes // that happened afterwards. func (fs *fileStore) recoverFullState() (rerr error) { @@ -1581,9 +1586,12 @@ func (fs *fileStore) recoverFullState() (rerr error) { fs.warn("Stream state bad subject len (%d)", lsubj) return errCorruptState } + // If we have lots of subjects this will alloc for each one. + // We could reference the underlying buffer, but we could guess wrong if + // number of blocks is large and subjects is low, since we would reference buf. subj := string(buf[bi : bi+lsubj]) bi += lsubj - psi := &psi{total: readU64(), fblk: uint32(readU64()), subj: subj} + psi := &psi{total: readU64(), fblk: uint32(readU64())} if psi.total > 1 { psi.lblk = uint32(readU64()) } else { @@ -1595,6 +1603,9 @@ func (fs *fileStore) recoverFullState() (rerr error) { } } + // Track the state as represented by the blocks themselves. + var mstate StreamState + if numBlocks := readU64(); numBlocks > 0 { lastIndex := int(numBlocks - 1) fs.blks = make([]*msgBlock, 0, numBlocks) @@ -1626,6 +1637,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { // Only add in if not empty or the lmb. if mb.msgs > 0 || i == lastIndex { fs.addMsgBlock(mb) + updateTrackingState(&mstate, mb) } else { // Mark dirty to cleanup. fs.dirty++ @@ -1671,20 +1683,37 @@ func (fs *fileStore) recoverFullState() (rerr error) { if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched { // Remove the last message block since recover will add in the new one. fs.removeMsgBlockFromList(mb) + // Reverse update of tracking state for this mb, will add new state in below. + mstate.Msgs -= mb.msgs + mstate.Bytes -= mb.bytes if nmb, err := fs.recoverMsgBlockNoSubjectUpdates(mb.index); err != nil && !os.IsNotExist(err) { + fs.warn("Stream state could not recover last msg block") os.Remove(fn) return errCorruptState } else if nmb != nil { fs.adjustAccounting(mb, nmb) + updateTrackingState(&mstate, mb) } } + // On success double check our state. + checkState := func() error { + // We check first and last seq and number of msgs and bytes. If there is a difference, + // return and error so we rebuild from the message block state on disk. + if !trackingStatesEqual(&fs.state, &mstate) { + fs.warn("Stream state encountered internal inconsistency on recover") + os.Remove(fn) + return errCorruptState + } + return nil + } + // We may need to check other blocks. Even if we matched last checksum we will see if there is another block. for bi := blkIndex + 1; ; bi++ { nmb, err := fs.recoverMsgBlock(bi) if err != nil { if os.IsNotExist(err) { - return nil + return checkState() } os.Remove(fn) fs.warn("Stream state could not recover msg block %d", bi) @@ -1702,6 +1731,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { } fs.state.Msgs += nmb.msgs fs.state.Bytes += nmb.bytes + updateTrackingState(&mstate, nmb) } } } @@ -1739,7 +1769,7 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) { info.lblk = nmb.index } } else { - fs.psim[sm.subj] = &psi{total: 1, fblk: nmb.index, lblk: nmb.index, subj: sm.subj} + fs.psim[sm.subj] = &psi{total: 1, fblk: nmb.index, lblk: nmb.index} fs.tsl += len(sm.subj) } } @@ -2864,7 +2894,6 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { if lmb := fs.lmb; lmb != nil { index = lmb.index + 1 - // Determine if we can reclaim any resources here. if fs.fip { lmb.mu.Lock() @@ -2877,8 +2906,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } } - mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects(), syncAlways: fs.fcfg.SyncAlways} - + mb := fs.initMsgBlock(index) // Lock should be held to quiet race detector. mb.mu.Lock() mb.setupWriteCache(rbuf) @@ -2900,12 +2928,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } mb.hh = hh - mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) - mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, mb.index)) mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms) if err != nil { mb.dirtyCloseWithRemove(true) - return nil, fmt.Errorf("Error creating msg block file [%q]: %v", mb.mfn, err) + return nil, fmt.Errorf("Error creating msg block file: %v", err) } mb.mfd = mfd @@ -3017,7 +3043,7 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in info.lblk = index } } else { - fs.psim[subj] = &psi{total: 1, fblk: index, lblk: index, subj: subj} + fs.psim[subj] = &psi{total: 1, fblk: index, lblk: index} fs.tsl += len(subj) } } @@ -3237,6 +3263,9 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { // Will check the msg limit and drop firstSeq msg if needed. // Lock should be held. func (fs *fileStore) enforceMsgLimit() { + if fs.cfg.Discard != DiscardOld { + return + } if fs.cfg.MaxMsgs <= 0 || fs.state.Msgs <= uint64(fs.cfg.MaxMsgs) { return } @@ -3251,6 +3280,9 @@ func (fs *fileStore) enforceMsgLimit() { // Will check the bytes limit and drop msgs if needed. // Lock should be held. func (fs *fileStore) enforceBytesLimit() { + if fs.cfg.Discard != DiscardOld { + return + } if fs.cfg.MaxBytes <= 0 || fs.state.Bytes <= uint64(fs.cfg.MaxBytes) { return } @@ -3692,15 +3724,11 @@ func (mb *msgBlock) compact() { // Normal message here. nbuf = append(nbuf, buf[index:index+rl]...) if !firstSet { - firstSet, fseq = true, seq + firstSet = true atomic.StoreUint64(&mb.first.seq, seq) } } } - // Always set last as long as not a tombstone. - if seq&tbit == 0 { - atomic.StoreUint64(&mb.last.seq, seq&^ebit) - } // Advance to next record. index += rl } @@ -3742,10 +3770,15 @@ func (mb *msgBlock) compact() { return } - // Wipe dmap and rebuild here. - mb.dmap.Empty() - mb.rebuildStateLocked() + // Capture the updated rbytes. + mb.rbytes = uint64(len(nbuf)) + // Remove any seqs from the beginning of the blk. + for seq, nfseq := fseq, atomic.LoadUint64(&mb.first.seq); seq < nfseq; seq++ { + mb.dmap.Delete(seq) + } + // Make sure we clear the cache since no longer valid. + mb.clearCacheAndOffset() // If we entered with the msgs loaded make sure to reload them. if wasLoaded { mb.loadMsgsWithLock() @@ -5060,7 +5093,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { ss.Msgs++ ss.Last = seq } else { - mb.fss[mb.subjString(bsubj)] = &SimpleState{Msgs: 1, First: seq, Last: seq} + mb.fss[string(bsubj)] = &SimpleState{ + Msgs: 1, + First: seq, + Last: seq, + } } } } @@ -5626,39 +5663,14 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store sm.msg = sm.buf[0 : end-slen] } sm.seq, sm.ts = seq, ts - // Treat subject a bit different to not reference underlying buf. if slen > 0 { - sm.subj = mb.subjString(data[:slen]) + // Make a copy since sm.subj lifetime may last longer. + sm.subj = string(data[:slen]) } return sm, nil } -// Given the `key` byte slice, this function will return the subject -// as an interned string of `key` or a configured subject as to minimize memory allocations. -// We used to have a pool structure when we leaned on block fss, which could duplicate subjects. -// Now we have fs scoped PSIM that is always present and is already tracking all in-use subjects. -// Lock should be held. -func (fs *fileStore) subjString(skey []byte) string { - if fs == nil || len(skey) == 0 { - return _EMPTY_ - } - if len(fs.psim) > 0 { - // Cast in place below to avoid allocation for lookup. - if psi := fs.psim[string(skey)]; psi != nil { - return psi.subj - } - } - return string(skey) -} - -// Given the `key` byte slice, this function will return the subject -// as an interned string of `key` or a configured subject as to minimize memory allocations. -// Lock should be held. -func (mb *msgBlock) subjString(skey []byte) string { - return mb.fs.subjString(skey) -} - // LoadMsg will lookup the message by sequence number and return it if found. func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) { return fs.msgForSeq(seq, sm) @@ -6316,6 +6328,8 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { smb.mu.Lock() if atomic.LoadUint64(&smb.first.seq) == seq { + fs.state.FirstSeq = atomic.LoadUint64(&smb.first.seq) + fs.state.FirstTime = time.Unix(0, smb.first.ts).UTC() goto SKIP } @@ -6758,7 +6772,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si buf := mb.cache.buf[li:] hdr := buf[:msgHdrSize] slen := int(le.Uint16(hdr[20:])) - if subj == string(buf[msgHdrSize:msgHdrSize+slen]) { + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { seq := le.Uint64(hdr[4:]) if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { continue @@ -6869,7 +6883,7 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) { info.lblk = mb.index } } else { - fs.psim[subj] = &psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index, subj: subj} + fs.psim[subj] = &psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index} fs.tsl += len(subj) } } @@ -6947,7 +6961,7 @@ func (fs *fileStore) Delete() error { // Do Purge() since if we have lots of blocks uses a mv/rename. fs.Purge() - if err := fs.Stop(); err != nil { + if err := fs.stop(false); err != nil { return err } @@ -7006,7 +7020,7 @@ const ( // This is also called during Stop(). func (fs *fileStore) flushStreamStateLoop(fch, qch, done chan struct{}) { // Make sure we do not try to write these out too fast. - const writeThreshold = time.Second * 10 + const writeThreshold = time.Minute lastWrite := time.Time{} // We will use these to complete the full state write while not doing them too fast. @@ -7065,7 +7079,6 @@ func timestampNormalized(t time.Time) int64 { // 4. Last block index and hash of record inclusive to this stream state. func (fs *fileStore) writeFullState() error { fs.mu.Lock() - if fs.closed || fs.dirty == 0 { fs.mu.Unlock() return nil @@ -7133,6 +7146,9 @@ func (fs *fileStore) writeFullState() error { baseTime := timestampNormalized(fs.state.FirstTime) var scratch [8 * 1024]byte + // Track the state as represented by the mbs. + var mstate StreamState + var dmapTotalLen int for _, mb := range fs.blks { mb.mu.RLock() @@ -7157,6 +7173,7 @@ func (fs *fileStore) writeFullState() error { mb.ensureLastChecksumLoaded() copy(lchk[0:], mb.lchk[:]) } + updateTrackingState(&mstate, mb) mb.mu.RUnlock() } if dmapTotalLen > 0 { @@ -7186,9 +7203,22 @@ func (fs *fileStore) writeFullState() error { // Snapshot prior dirty count. priorDirty := fs.dirty + + // Check tracking state. + statesEqual := trackingStatesEqual(&fs.state, &mstate) // Release lock. fs.mu.Unlock() + // Check consistency here. + if !statesEqual { + fs.warn("Stream state encountered internal inconsistency on write") + // Rebuild our fs state from the mb state. + fs.rebuildState(nil) + // Make sure to reprocess. + fs.kickFlushStateLoop() + return errCorruptState + } + if cap(buf) > sz { fs.warn("WriteFullState reallocated from %d to %d", sz, cap(buf)) } @@ -7225,6 +7255,11 @@ func (fs *fileStore) writeFullState() error { // Stop the current filestore. func (fs *fileStore) Stop() error { + return fs.stop(true) +} + +// Stop the current filestore. +func (fs *fileStore) stop(writeState bool) error { fs.mu.Lock() if fs.closed || fs.closing { fs.mu.Unlock() @@ -7235,7 +7270,9 @@ func (fs *fileStore) Stop() error { // so we don't end up with this function running more than once. fs.closing = true - fs.checkAndFlushAllBlocks() + if writeState { + fs.checkAndFlushAllBlocks() + } fs.closeAllMsgBlocks(false) fs.cancelSyncTimer() @@ -7247,13 +7284,15 @@ func (fs *fileStore) Stop() error { fs.qch = nil } - // Wait for the state flush loop to exit. - fsld := fs.fsld - fs.mu.Unlock() - <-fsld - // Write full state if needed. If not dirty this is a no-op. - fs.writeFullState() - fs.mu.Lock() + if writeState { + // Wait for the state flush loop to exit. + fsld := fs.fsld + fs.mu.Unlock() + <-fsld + // Write full state if needed. If not dirty this is a no-op. + fs.writeFullState() + fs.mu.Lock() + } // Mark as closed. Last message block needs to be cleared after // writeFullState has completed. @@ -8473,6 +8512,12 @@ func decodeConsumerState(buf []byte) (*ConsumerState, error) { } } + // Protect ourselves against rolling backwards. + const hbit = 1 << 63 + if state.AckFloor.Stream&hbit != 0 || state.Delivered.Stream&hbit != 0 { + return nil, errCorruptState + } + // We have additional stuff. if numPending := readLen(); numPending > 0 { mints := readTimeStamp() @@ -8790,4 +8835,5 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) { output = append(output, checksum...) return output, reader.Close() + } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/gateway.go b/vendor/github.com/nats-io/nats-server/v2/server/gateway.go index 715a2c1dc5..f5f154700c 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/gateway.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/gateway.go @@ -1,4 +1,4 @@ -// Copyright 2018-2020 The NATS Authors +// Copyright 2018-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1225,22 +1225,22 @@ func (s *Server) forwardNewGatewayToLocalCluster(oinfo *Info) { // messages from the remote's outbound connection. This side is // the one sending the subscription interest. func (s *Server) sendQueueSubsToGateway(c *client) { - s.sendSubsToGateway(c, nil) + s.sendSubsToGateway(c, _EMPTY_) } // Sends all subscriptions for the given account to the remove gateway // This is sent from the inbound side, that is, the side that receives // messages from the remote's outbound connection. This side is // the one sending the subscription interest. -func (s *Server) sendAccountSubsToGateway(c *client, accName []byte) { +func (s *Server) sendAccountSubsToGateway(c *client, accName string) { s.sendSubsToGateway(c, accName) } -func gwBuildSubProto(buf *bytes.Buffer, accName []byte, acc map[string]*sitally, doQueues bool) { +func gwBuildSubProto(buf *bytes.Buffer, accName string, acc map[string]*sitally, doQueues bool) { for saq, si := range acc { if doQueues && si.q || !doQueues && !si.q { buf.Write(rSubBytes) - buf.Write(accName) + buf.WriteString(accName) buf.WriteByte(' ') // For queue subs (si.q is true), saq will be // subject + ' ' + queue, for plain subs, this is @@ -1255,7 +1255,7 @@ func gwBuildSubProto(buf *bytes.Buffer, accName []byte, acc map[string]*sitally, } // Sends subscriptions to remote gateway. -func (s *Server) sendSubsToGateway(c *client, accountName []byte) { +func (s *Server) sendSubsToGateway(c *client, accountName string) { var ( bufa = [32 * 1024]byte{} bbuf = bytes.NewBuffer(bufa[:0]) @@ -1268,22 +1268,22 @@ func (s *Server) sendSubsToGateway(c *client, accountName []byte) { defer gw.pasi.Unlock() // If account is specified... - if accountName != nil { + if accountName != _EMPTY_ { // Simply send all plain subs (no queues) for this specific account - gwBuildSubProto(bbuf, accountName, gw.pasi.m[string(accountName)], false) + gwBuildSubProto(bbuf, accountName, gw.pasi.m[accountName], false) // Instruct to send all subs (RS+/-) for this account from now on. c.mu.Lock() - e := c.gw.insim[string(accountName)] + e := c.gw.insim[accountName] if e == nil { e = &insie{} - c.gw.insim[string(accountName)] = e + c.gw.insim[accountName] = e } e.mode = InterestOnly c.mu.Unlock() } else { // Send queues for all accounts for accName, acc := range gw.pasi.m { - gwBuildSubProto(bbuf, []byte(accName), acc, true) + gwBuildSubProto(bbuf, accName, acc, true) } } @@ -1899,7 +1899,7 @@ func (c *client) processGatewayRUnsub(arg []byte) error { return nil } if e.sl.Remove(sub) == nil { - delete(c.subs, string(key)) + delete(c.subs, bytesToString(key)) if queue != nil { e.qsubs-- atomic.AddInt64(&c.srv.gateway.totalQSubs, -1) @@ -1976,7 +1976,7 @@ func (c *client) processGatewayRSub(arg []byte) error { } defer c.mu.Unlock() - ei, _ := c.gw.outsim.Load(string(accName)) + ei, _ := c.gw.outsim.Load(bytesToString(accName)) // We should always have an existing entry for plain subs because // in optimistic mode we would have received RS- first, and // in full knowledge, we are receiving RS+ for an account after @@ -2038,7 +2038,7 @@ func (c *client) processGatewayRSub(arg []byte) error { srv = c.srv callUpdate = true } else { - subj := string(subject) + subj := bytesToString(subject) // If this is an RS+ for a wc subject, then // remove from the no interest map all subjects // that are a subset of this wc subject. @@ -2149,8 +2149,8 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription accProtoa [256]byte accProto []byte proto []byte - subject = string(sub.subject) - hasWc = subjectHasWildcard(subject) + subject = bytesToString(sub.subject) + hasWC = subjectHasWildcard(subject) ) for _, c := range gws { proto = nil @@ -2165,7 +2165,7 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription // For wildcard subjects, we will remove from our no-interest // map, all subjects that are a subset of this wc subject, but we // still send the wc subject and let the remote do its own cleanup. - if hasWc { + if hasWC { for enis := range e.ni { if subjectIsSubsetMatch(enis, subject) { delete(e.ni, enis) @@ -2337,7 +2337,7 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha } else { entry.n += change if entry.n <= 0 { - delete(st, string(key)) + delete(st, bytesToString(key)) last = true if len(st) == 0 { delete(accMap, accName) @@ -2381,7 +2381,7 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha // that is, starts with $GNR and is long enough to contain cluster/server hash // and subject. func isGWRoutedReply(subj []byte) bool { - return len(subj) > gwSubjectOffset && string(subj[:gwReplyPrefixLen]) == gwReplyPrefix + return len(subj) > gwSubjectOffset && bytesToString(subj[:gwReplyPrefixLen]) == gwReplyPrefix } // Same than isGWRoutedReply but accepts the old prefix $GR and returns @@ -2390,7 +2390,7 @@ func isGWRoutedSubjectAndIsOldPrefix(subj []byte) (bool, bool) { if isGWRoutedReply(subj) { return true, false } - if len(subj) > oldGWReplyStart && string(subj[:oldGWReplyPrefixLen]) == oldGWReplyPrefix { + if len(subj) > oldGWReplyStart && bytesToString(subj[:oldGWReplyPrefixLen]) == oldGWReplyPrefix { return true, true } return false, false @@ -2399,7 +2399,7 @@ func isGWRoutedSubjectAndIsOldPrefix(subj []byte) (bool, bool) { // Returns true if subject starts with "$GNR.". This is to check that // clients can't publish on this subject. func hasGWRoutedReplyPrefix(subj []byte) bool { - return len(subj) > gwReplyPrefixLen && string(subj[:gwReplyPrefixLen]) == gwReplyPrefix + return len(subj) > gwReplyPrefixLen && bytesToString(subj[:gwReplyPrefixLen]) == gwReplyPrefix } // Evaluates if the given reply should be mapped or not. @@ -2455,7 +2455,6 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr return false } var ( - subj = string(subject) queuesa = [512]byte{} queues = queuesa[:0] accName = acc.Name @@ -2499,7 +2498,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr } } else { // Plain sub interest and queue sub results for this account/subject - psi, qr := gwc.gatewayInterest(accName, subj) + psi, qr := gwc.gatewayInterest(accName, string(subject)) if !psi && qr == nil { continue } @@ -2760,11 +2759,11 @@ func (s *Server) removeRouteByHash(srvIDHash string) { // Returns the route with given hash or nil if not found. // This is for gateways only. func (s *Server) getRouteByHash(hash, accName []byte) (*client, bool) { - id := string(hash) + id := bytesToString(hash) var perAccount bool - if v, ok := s.accRouteByHash.Load(string(accName)); ok { + if v, ok := s.accRouteByHash.Load(bytesToString(accName)); ok { if v == nil { - id += string(accName) + id += bytesToString(accName) perAccount = true } else { id += strconv.Itoa(v.(int)) @@ -2774,7 +2773,7 @@ func (s *Server) getRouteByHash(hash, accName []byte) (*client, bool) { return v.(*client), perAccount } else if !perAccount { // Check if we have a "no pool" connection at index 0. - if v, ok := s.gateway.routesIDByHash.Load(string(hash) + "0"); ok { + if v, ok := s.gateway.routesIDByHash.Load(bytesToString(hash) + "0"); ok { if r := v.(*client); r != nil { r.mu.Lock() noPool := r.route.noPool @@ -3051,13 +3050,13 @@ func (c *client) gatewayAllSubsReceiveStart(info *Info) { // func (c *client) gatewayAllSubsReceiveComplete(info *Info) { account := getAccountFromGatewayCommand(c, info, "complete") - if account == "" { + if account == _EMPTY_ { return } // Done receiving all subs from remote. Set the `ni` // map to nil so that gatewayInterest() no longer // uses it. - ei, _ := c.gw.outsim.Load(string(account)) + ei, _ := c.gw.outsim.Load(account) if ei != nil { e := ei.(*outsie) // Needs locking here since `ni` is checked by @@ -3077,7 +3076,7 @@ func getAccountFromGatewayCommand(c *client, info *Info, cmd string) string { if info.GatewayCmdPayload == nil { c.sendErrAndErr(fmt.Sprintf("Account absent from receive-all-subscriptions-%s command", cmd)) c.closeConnection(ProtocolViolation) - return "" + return _EMPTY_ } return string(info.GatewayCmdPayload) } @@ -3113,7 +3112,7 @@ func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie, accName string) { info := Info{ Gateway: s.gateway.name, GatewayCmd: cmd, - GatewayCmdPayload: []byte(accName), + GatewayCmdPayload: stringToBytes(accName), } b, _ := json.Marshal(&info) @@ -3138,7 +3137,7 @@ func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie, accName string) { s.startGoRoutine(func() { defer s.grWG.Done() - s.sendAccountSubsToGateway(c, []byte(accName)) + s.sendAccountSubsToGateway(c, accName) // Send the complete command. When the remote receives // this, it will not send a message unless it has a // matching sub from us. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index d55b3e0bcd..5354e974b3 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -1991,9 +1991,9 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor js.mu.Unlock() return nil } - js.mu.Unlock() s.Debugf("JetStream cluster creating raft group:%+v", rg) + js.mu.Unlock() sysAcc := s.SystemAccount() if sysAcc == nil { @@ -4300,8 +4300,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } else { // Clustered consumer. // Start our monitoring routine if needed. - if !alreadyRunning && !o.isMonitorRunning() { - o.monitorWg.Add(1) + if !alreadyRunning && o.shouldStartMonitor() { s.startGoRoutine( func() { js.monitorConsumer(o, ca) }, pprofLabels{ @@ -4508,19 +4507,13 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { s, n, cc := js.server(), o.raftNode(), js.cluster defer s.grWG.Done() - defer o.monitorWg.Done() + defer o.clearMonitorRunning() if n == nil { s.Warnf("No RAFT group for '%s > %s > %s'", o.acc.Name, ca.Stream, ca.Name) return } - // Make sure only one is running. - if o.checkInMonitor() { - return - } - defer o.clearMonitorRunning() - // Make sure to stop the raft group on exit to prevent accidental memory bloat. // This should be below the checkInMonitor call though to avoid stopping it out // from underneath the one that is running since it will be the same raft node. @@ -7439,7 +7432,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 - name, stype := mset.cfg.Name, mset.cfg.Storage + name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed @@ -7539,6 +7532,26 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Some header checks can be checked pre proposal. Most can not. if len(hdr) > 0 { + // Expected last sequence per subject. + // We can check for last sequence per subject but only if the expected seq <= lseq. + if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 && seq <= lseq { + var smv StoreMsg + var fseq uint64 + sm, err := store.LoadLastMsg(subject, &smv) + if sm != nil { + fseq = sm.seq + } + if err != nil || fseq != seq { + if canRespond { + var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} + resp.PubAck = &PubAck{Stream: name} + resp.Error = NewJSStreamWrongLastSequenceError(fseq) + b, _ := json.Marshal(resp) + outq.sendMsg(reply, b) + } + return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq) + } + } // Expected stream name can also be pre-checked. if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name { if canRespond { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go index 97fa8b4197..e9503b9d1a 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go @@ -828,7 +828,7 @@ func (c *client) sendLeafConnect(clusterName string, headers bool) error { sigraw, _ := kp.Sign(c.nonce) sig := base64.RawURLEncoding.EncodeToString(sigraw) - cinfo.JWT = string(tmp) + cinfo.JWT = bytesToString(tmp) cinfo.Sig = sig } else if userInfo := c.leaf.remote.curURL.User; userInfo != nil { cinfo.User = userInfo.Username() @@ -1039,7 +1039,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf // Remember the nonce we sent here for signatures, etc. c.nonce = make([]byte, nonceLen) copy(c.nonce, nonce[:]) - info.Nonce = string(c.nonce) + info.Nonce = bytesToString(c.nonce) info.CID = c.cid proto := generateInfoJSON(info) if !opts.LeafNode.TLSHandshakeFirst { @@ -1363,7 +1363,7 @@ func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompre cid := c.cid var nonce string if !didSolicit { - nonce = string(c.nonce) + nonce = bytesToString(c.nonce) } c.mu.Unlock() @@ -1970,16 +1970,15 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { rc := c.leaf.remoteCluster c.leaf.smap = make(map[string]int32) for _, sub := range subs { - subj := string(sub.subject) // Check perms regardless of role. - if !c.canSubscribe(subj) { - c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", subj, accName, accNTag) + if c.perms != nil && !c.canSubscribe(string(sub.subject)) { + c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag) continue } // We ignore ourselves here. // Also don't add the subscription if it has a origin cluster and the // cluster name matches the one of the client we are sending to. - if c != sub.client && (sub.origin == nil || (string(sub.origin) != rc)) { + if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) { count := int32(1) if len(sub.queue) > 0 && sub.qw > 0 { count = sub.qw @@ -2069,7 +2068,7 @@ func (acc *Account) updateLeafNodes(sub *subscription, delta int32) { // Capture the cluster even if its empty. cluster := _EMPTY_ if sub.origin != nil { - cluster = string(sub.origin) + cluster = bytesToString(sub.origin) } // If we have an isolated cluster we can return early, as long as it is not a loop detection subject. @@ -2196,19 +2195,15 @@ func (c *client) sendLeafNodeSubUpdate(key string, n int32) { // Helper function to build the key. func keyFromSub(sub *subscription) string { - var _rkey [1024]byte - var key []byte - + var sb strings.Builder + sb.Grow(len(sub.subject) + len(sub.queue) + 1) + sb.Write(sub.subject) if sub.queue != nil { // Just make the key subject spc group, e.g. 'foo bar' - key = _rkey[:0] - key = append(key, sub.subject...) - key = append(key, byte(' ')) - key = append(key, sub.queue...) - } else { - key = sub.subject + sb.WriteByte(' ') + sb.Write(sub.queue) } - return string(key) + return sb.String() } // Lock should be held. @@ -2281,7 +2276,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { acc := c.acc // Check if we have a loop. ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix)) - if ldsPrefix && string(sub.subject) == acc.getLDSubject() { + if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() { c.mu.Unlock() c.handleLeafNodeLoop(true) return nil @@ -2298,11 +2293,14 @@ func (c *client) processLeafSub(argo []byte) (err error) { } // If we are a hub check that we can publish to this subject. - if checkPerms && subjectIsLiteral(string(sub.subject)) && !c.pubAllowedFullCheck(string(sub.subject), true, true) { - c.mu.Unlock() - c.leafSubPermViolation(sub.subject) - c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) - return nil + if checkPerms { + subj := string(sub.subject) + if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) { + c.mu.Unlock() + c.leafSubPermViolation(sub.subject) + c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) + return nil + } } // Check if we have a maximum on the number of subscriptions. @@ -2324,7 +2322,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { } else { sub.sid = arg } - key := string(sub.sid) + key := bytesToString(sub.sid) osub := c.subs[key] updateGWs := false delta := int32(1) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index adf660846c..9853dd316f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -284,7 +284,18 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { if ts <= ms.msgs[ms.state.FirstSeq].ts { return ms.state.FirstSeq } - last := ms.msgs[ms.state.LastSeq].ts + // LastSeq is not guaranteed to be present since last does not go backwards. + var lmsg *StoreMsg + for lseq := ms.state.LastSeq; lseq > ms.state.FirstSeq; lseq-- { + if lmsg = ms.msgs[lseq]; lmsg != nil { + break + } + } + if lmsg == nil { + return ms.state.FirstSeq + } + + last := lmsg.ts if ts == last { return ms.state.LastSeq } @@ -292,7 +303,10 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { return ms.state.LastSeq + 1 } index := sort.Search(len(ms.msgs), func(i int) bool { - return ms.msgs[uint64(i)+ms.state.FirstSeq].ts >= ts + if msg := ms.msgs[ms.state.FirstSeq+uint64(i)]; msg != nil { + return msg.ts >= ts + } + return false }) return uint64(index) + ms.state.FirstSeq } @@ -550,6 +564,9 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) { // Will check the msg limit and drop firstSeq msg if needed. // Lock should be held. func (ms *memStore) enforceMsgLimit() { + if ms.cfg.Discard != DiscardOld { + return + } if ms.cfg.MaxMsgs <= 0 || ms.state.Msgs <= uint64(ms.cfg.MaxMsgs) { return } @@ -561,6 +578,9 @@ func (ms *memStore) enforceMsgLimit() { // Will check the bytes limit and drop msgs if needed. // Lock should be held. func (ms *memStore) enforceBytesLimit() { + if ms.cfg.Discard != DiscardOld { + return + } if ms.cfg.MaxBytes <= 0 || ms.state.Bytes <= uint64(ms.cfg.MaxBytes) { return } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go index 073c468e0d..90f3f1de63 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go @@ -1159,68 +1159,68 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { // Varz will output server information on the monitoring port at /varz. type Varz struct { - ID string `json:"server_id"` - Name string `json:"server_name"` - Version string `json:"version"` - Proto int `json:"proto"` - GitCommit string `json:"git_commit,omitempty"` - GoVersion string `json:"go"` - Host string `json:"host"` - Port int `json:"port"` - AuthRequired bool `json:"auth_required,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` - IP string `json:"ip,omitempty"` - ClientConnectURLs []string `json:"connect_urls,omitempty"` - WSConnectURLs []string `json:"ws_connect_urls,omitempty"` - MaxConn int `json:"max_connections"` - MaxSubs int `json:"max_subscriptions,omitempty"` - PingInterval time.Duration `json:"ping_interval"` - MaxPingsOut int `json:"ping_max"` - HTTPHost string `json:"http_host"` - HTTPPort int `json:"http_port"` - HTTPBasePath string `json:"http_base_path"` - HTTPSPort int `json:"https_port"` - AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int32 `json:"max_control_line"` - MaxPayload int `json:"max_payload"` - MaxPending int64 `json:"max_pending"` - Cluster ClusterOptsVarz `json:"cluster,omitempty"` - Gateway GatewayOptsVarz `json:"gateway,omitempty"` - LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"` - MQTT MQTTOptsVarz `json:"mqtt,omitempty"` - Websocket WebsocketOptsVarz `json:"websocket,omitempty"` - JetStream JetStreamVarz `json:"jetstream,omitempty"` - TLSTimeout float64 `json:"tls_timeout"` - WriteDeadline time.Duration `json:"write_deadline"` - Start time.Time `json:"start"` - Now time.Time `json:"now"` - Uptime string `json:"uptime"` - Mem int64 `json:"mem"` - Cores int `json:"cores"` - MaxProcs int `json:"gomaxprocs"` - CPU float64 `json:"cpu"` - Connections int `json:"connections"` - TotalConnections uint64 `json:"total_connections"` - Routes int `json:"routes"` - Remotes int `json:"remotes"` - Leafs int `json:"leafnodes"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - SlowConsumers int64 `json:"slow_consumers"` - Subscriptions uint32 `json:"subscriptions"` - HTTPReqStats map[string]uint64 `json:"http_req_stats"` - ConfigLoadTime time.Time `json:"config_load_time"` - Tags jwt.TagList `json:"tags,omitempty"` - TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"` - TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"` - SystemAccount string `json:"system_account,omitempty"` - PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` - OCSPResponseCache OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` - SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` + ID string `json:"server_id"` + Name string `json:"server_name"` + Version string `json:"version"` + Proto int `json:"proto"` + GitCommit string `json:"git_commit,omitempty"` + GoVersion string `json:"go"` + Host string `json:"host"` + Port int `json:"port"` + AuthRequired bool `json:"auth_required,omitempty"` + TLSRequired bool `json:"tls_required,omitempty"` + TLSVerify bool `json:"tls_verify,omitempty"` + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` + IP string `json:"ip,omitempty"` + ClientConnectURLs []string `json:"connect_urls,omitempty"` + WSConnectURLs []string `json:"ws_connect_urls,omitempty"` + MaxConn int `json:"max_connections"` + MaxSubs int `json:"max_subscriptions,omitempty"` + PingInterval time.Duration `json:"ping_interval"` + MaxPingsOut int `json:"ping_max"` + HTTPHost string `json:"http_host"` + HTTPPort int `json:"http_port"` + HTTPBasePath string `json:"http_base_path"` + HTTPSPort int `json:"https_port"` + AuthTimeout float64 `json:"auth_timeout"` + MaxControlLine int32 `json:"max_control_line"` + MaxPayload int `json:"max_payload"` + MaxPending int64 `json:"max_pending"` + Cluster ClusterOptsVarz `json:"cluster,omitempty"` + Gateway GatewayOptsVarz `json:"gateway,omitempty"` + LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"` + MQTT MQTTOptsVarz `json:"mqtt,omitempty"` + Websocket WebsocketOptsVarz `json:"websocket,omitempty"` + JetStream JetStreamVarz `json:"jetstream,omitempty"` + TLSTimeout float64 `json:"tls_timeout"` + WriteDeadline time.Duration `json:"write_deadline"` + Start time.Time `json:"start"` + Now time.Time `json:"now"` + Uptime string `json:"uptime"` + Mem int64 `json:"mem"` + Cores int `json:"cores"` + MaxProcs int `json:"gomaxprocs"` + CPU float64 `json:"cpu"` + Connections int `json:"connections"` + TotalConnections uint64 `json:"total_connections"` + Routes int `json:"routes"` + Remotes int `json:"remotes"` + Leafs int `json:"leafnodes"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + SlowConsumers int64 `json:"slow_consumers"` + Subscriptions uint32 `json:"subscriptions"` + HTTPReqStats map[string]uint64 `json:"http_req_stats"` + ConfigLoadTime time.Time `json:"config_load_time"` + Tags jwt.TagList `json:"tags,omitempty"` + TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"` + TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"` + SystemAccount string `json:"system_account,omitempty"` + PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` + OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` + SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` } // JetStreamVarz contains basic runtime information about jetstream @@ -1767,7 +1767,7 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64 if s.ocsprc != nil && s.ocsprc.Type() != "none" { stats := s.ocsprc.Stats() if stats != nil { - v.OCSPResponseCache = OCSPResponseCacheVarz{ + v.OCSPResponseCache = &OCSPResponseCacheVarz{ s.ocsprc.Type(), stats.Hits, stats.Misses, diff --git a/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go b/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go index b347936f5d..783af64bf5 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go @@ -183,6 +183,11 @@ const ( // For Websocket URLs mqttWSPath = "/mqtt" + + mqttInitialPubHeader = 16 // An overkill, should need 7 bytes max + mqttProcessSubTooLong = 100 * time.Millisecond + mqttRetainedCacheTTL = 2 * time.Minute + mqttRetainedTransferTimeout = 10 * time.Second ) var ( @@ -237,6 +242,7 @@ type mqttAccountSessionManager struct { flapTimer *time.Timer // Timer to perform some cleanup of the flappers map sl *Sublist // sublist allowing to find retained messages for given subscription retmsgs map[string]*mqttRetainedMsgRef // retained messages + rmsCache sync.Map // map[string(subject)]mqttRetainedMsg jsa mqttJSA rrmLastSeq uint64 // Restore retained messages expected last sequence rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded @@ -269,7 +275,13 @@ type mqttRetMsgDel struct { } type mqttSession struct { - mu sync.Mutex + // subsMu is a "quick" version of the session lock, sufficient for the QoS0 + // callback. It only guarantees that a new subscription is initialized, and + // its retained messages if any have been queued up for delivery. The QoS12 + // callback uses the session lock. + mu sync.Mutex + subsMu sync.RWMutex + id string // client ID idHash string // client ID hash c *client @@ -321,6 +333,8 @@ type mqttRetainedMsg struct { Msg []byte `json:"msg,omitempty"` Flags byte `json:"flags,omitempty"` Source string `json:"source,omitempty"` + + expiresFromCache time.Time } type mqttRetainedMsgRef struct { @@ -329,13 +343,25 @@ type mqttRetainedMsgRef struct { sub *subscription } +// mqttSub contains fields associated with a MQTT subscription, and is added to +// the main subscription struct for MQTT message delivery subscriptions. The +// delivery callbacks may get invoked before sub.mqtt is set up, so they should +// acquire either sess.mu or sess.subsMu before accessing it. type mqttSub struct { - qos byte - // Pending serialization of retained messages to be sent when subscription is registered - prm *mqttWriter - // This is the JS durable name this subscription is attached to. + // The sub's QOS and the JS durable name. They can change when + // re-subscribing, and are used in the delivery callbacks. They can be + // quickly accessed using sess.subsMu.RLock, or under the main session lock. + qos byte jsDur string - // If this subscription needs to be checked for being reserved. E.g. # or * or */ + + // Pending serialization of retained messages to be sent when subscription + // is registered. The sub's delivery callbacks must wait until `prm` is + // ready (can block on sess.mu for that, too). + prm [][]byte + + // If this subscription needs to be checked for being reserved. E.g. '#' or + // '*' or '*/'. It is set up at the time of subscription and is immutable + // after that. reserved bool } @@ -1090,7 +1116,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc c := s.createInternalAccountClient() c.acc = acc - id := getHash(s.Name()) + id := s.NodeName() replicas := opts.MQTT.StreamReplicas if replicas <= 0 { replicas = s.mqttDetermineReplicas() @@ -1203,6 +1229,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc as.sendJSAPIrequests(s, c, accName, closeCh) }) + // Start the go routine that will clean up cached retained messages that expired. + s.startGoRoutine(func() { + defer s.grWG.Done() + as.cleaupRetainedMessageCache(s, closeCh) + }) + lookupStream := func(stream, txt string) (*StreamInfo, error) { si, err := jsa.lookupStream(stream) if err != nil { @@ -1301,10 +1333,13 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc } // This is the only case where we need "si" after lookup/create + needToTransfer := true si, err := lookupStream(mqttRetainedMsgsStreamName, "retained messages") - if err != nil { + switch { + case err != nil: return nil, err - } else if si == nil { + + case si == nil: // Create the stream for retained messages. cfg := &StreamConfig{ Name: mqttRetainedMsgsStreamName, @@ -1327,7 +1362,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc return nil, err } } + needToTransfer = false + + default: + needToTransfer = si.Config.MaxMsgsPer != 1 } + // Doing this check outside of above if/else due to possible race when // creating the stream. wantedSubj := mqttRetainedMsgsStreamSubject + ">" @@ -1338,30 +1378,49 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc return nil, fmt.Errorf("failed to update stream config: %w", err) } } - // Try to transfer regardless if we have already updated the stream or not - // in case not all messages were transferred and the server was restarted. - if as.transferRetainedToPerKeySubjectStream(s) { + + transferRMS := func() error { + if !needToTransfer { + return nil + } + + as.transferRetainedToPerKeySubjectStream(s) + // We need another lookup to have up-to-date si.State values in order // to load all retained messages. si, err = lookupStream(mqttRetainedMsgsStreamName, "retained messages") if err != nil { - return nil, err + return err } + needToTransfer = false + return nil } + + // Attempt to transfer all "single subject" retained messages to new + // subjects. It may fail, will log its own error; ignore it the first time + // and proceed to updating MaxMsgsPer. Then we invoke transferRMS() again, + // which will get another chance to resolve the error; if not we bail there. + if err = transferRMS(); err != nil { + return nil, err + } + // Now, if the stream does not have MaxMsgsPer set to 1, and there are no // more messages on the single $MQTT.rmsgs subject, update the stream again. if si.Config.MaxMsgsPer != 1 { - _, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs") - // Looking for an error indicated that there is no such message. - if err != nil && IsNatsErr(err, JSNoMessageFoundErr) { - si.Config.MaxMsgsPer = 1 - // We will need an up-to-date si, so don't use local variable here. - if si, err = jsa.updateStream(&si.Config); err != nil { - return nil, fmt.Errorf("failed to update stream config: %w", err) - } + si.Config.MaxMsgsPer = 1 + // We will need an up-to-date si, so don't use local variable here. + if si, err = jsa.updateStream(&si.Config); err != nil { + return nil, fmt.Errorf("failed to update stream config: %w", err) } } + // If we failed the first time, there is now at most one lingering message + // in the old subject. Try again (it will be a NO-OP if succeeded the first + // time). + if err = transferRMS(); err != nil { + return nil, err + } + var lastSeq uint64 var rmDoneCh chan struct{} st := si.State @@ -1466,20 +1525,21 @@ func (jsa *mqttJSA) prefixDomain(subject string) string { func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []byte, timeout time.Duration) (interface{}, error) { var sb strings.Builder - jsa.mu.Lock() // Either we use nuid.Next() which uses a global lock, or our own nuid object, but // then it needs to be "write" protected. This approach will reduce across account // contention since we won't use the global nuid's lock. + jsa.mu.Lock() + uid := jsa.nuid.Next() sb.WriteString(jsa.rplyr) + jsa.mu.Unlock() + sb.WriteString(kind) sb.WriteByte(btsep) if cidHash != _EMPTY_ { sb.WriteString(cidHash) sb.WriteByte(btsep) } - sb.WriteString(jsa.nuid.Next()) - jsa.mu.Unlock() - + sb.WriteString(uid) reply := sb.String() ch := make(chan interface{}, 1) @@ -1797,9 +1857,7 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie seq, _, _ := ackReplyInfo(reply) // Handle this retained message - rf := &mqttRetainedMsgRef{} - rf.sseq = seq - as.handleRetainedMsg(rm.Subject, rf) + as.handleRetainedMsg(rm.Subject, &mqttRetainedMsgRef{sseq: seq}, rm) // If we were recovering (lastSeq > 0), then check if we are done. if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq { @@ -1937,6 +1995,40 @@ func (as *mqttAccountSessionManager) createSubscription(subject string, cb msgHa return nil } +// A timer loop to cleanup up expired cached retained messages for a given MQTT account. +// The closeCh is used by the caller to be able to interrupt this routine +// if the rest of the initialization fails, since the quitCh is really +// only used when the server shutdown. +// +// No lock held on entry. +func (as *mqttAccountSessionManager) cleaupRetainedMessageCache(s *Server, closeCh chan struct{}) { + tt := time.NewTicker(mqttRetainedCacheTTL) + defer tt.Stop() + for { + select { + case <-tt.C: + // Set a limit to the number of retained messages to scan since we + // lock as for it. Since the map enumeration gives random order we + // should eventually clean up everything. + i, maxScan := 0, 10*1000 + now := time.Now() + as.rmsCache.Range(func(key, value interface{}) bool { + rm := value.(mqttRetainedMsg) + if now.After(rm.expiresFromCache) { + as.rmsCache.Delete(key) + } + i++ + return i < maxScan + }) + + case <-closeCh: + return + case <-s.quitCh: + return + } + } +} + // Loop to send JS API requests for a given MQTT account. // The closeCh is used by the caller to be able to interrupt this routine // if the rest of the initialization fails, since the quitCh is really @@ -2031,7 +2123,7 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc // If a message for this topic already existed, the existing record is updated // with the provided information. // Lock not held on entry. -func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsgRef) { +func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rf *mqttRetainedMsgRef, rm *mqttRetainedMsg) { as.mu.Lock() defer as.mu.Unlock() if as.retmsgs == nil { @@ -2042,11 +2134,11 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai if erm, exists := as.retmsgs[key]; exists { // If the new sequence is below the floor or the existing one, // then ignore the new one. - if rm.sseq <= erm.sseq || rm.sseq <= erm.floor { + if rf.sseq <= erm.sseq || rf.sseq <= erm.floor { return } // Capture existing sequence number so we can return it as the old sequence. - erm.sseq = rm.sseq + erm.sseq = rf.sseq // Clear the floor erm.floor = 0 // If sub is nil, it means that it was removed from sublist following a @@ -2055,12 +2147,23 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai erm.sub = &subscription{subject: []byte(key)} as.sl.Insert(erm.sub) } + + // Update the in-memory retained message cache but only for messages + // that are already in the cache, i.e. have been (recently) used. + if rm != nil { + if _, ok := as.rmsCache.Load(key); ok { + toStore := *rm + toStore.expiresFromCache = time.Now().Add(mqttRetainedCacheTTL) + as.rmsCache.Store(key, toStore) + } + } return } } - rm.sub = &subscription{subject: []byte(key)} - as.retmsgs[key] = rm - as.sl.Insert(rm.sub) + + rf.sub = &subscription{subject: []byte(key)} + as.retmsgs[key] = rf + as.sl.Insert(rf.sub) } // Removes the retained message for the given `subject` if present, and returns the @@ -2077,6 +2180,7 @@ func (as *mqttAccountSessionManager) handleRetainedMsgDel(subject string, seq ui as.sl = NewSublistWithCache() } if erm, ok := as.retmsgs[subject]; ok { + as.rmsCache.Delete(subject) if erm.sub != nil { as.sl.Remove(erm.sub) erm.sub = nil @@ -2164,10 +2268,39 @@ func (as *mqttAccountSessionManager) removeSession(sess *mqttSession, lock bool) } } -// Helpers that sets the sub's mqtt fields and possibly serialize -// (pre-loaded) retained messages. -// Session lock held on entry. -func (sess *mqttSession) processSub(c *client, subject, sid []byte, isReserved bool, qos byte, jsDurName string, h msgHandler, initShadow bool) (*subscription, error) { +// Helper to set the sub's mqtt fields and possibly serialize (pre-loaded) +// retained messages. +// +// Session lock held on entry. Acquires the subs lock and holds it for +// the duration. Non-MQTT messages coming into mqttDeliverMsgCbQoS0 will be +// waiting. +func (sess *mqttSession) processSub( + // subscribing client. + c *client, + // subscription parameters. + subject, sid []byte, isReserved bool, qos byte, jsDurName string, h msgHandler, + // do we need to scan for shadow subscriptions? (we don't do it for QOS1+) + initShadow bool, + // len(rms) > 0 means to deliver retained messages for the subscription. + rms map[string]*mqttRetainedMsg, + // trace serialized retained messages in the log. + trace bool, + // the retained messages are kept in the account session manager. + as *mqttAccountSessionManager, +) (*subscription, error) { + start := time.Now() + defer func() { + elapsed := time.Since(start) + if elapsed > mqttProcessSubTooLong { + c.Warnf("Took too long to process subscription for %q: %v", subject, elapsed) + } + }() + + // Hold subsMu to prevent QOS0 messages callback from doing anything until + // the (MQTT) sub is initialized. + sess.subsMu.Lock() + defer sess.subsMu.Unlock() + sub, err := c.processSub(subject, nil, sid, h, false) if err != nil { // c.processSub already called c.Errorf(), so no need here. @@ -2179,12 +2312,24 @@ func (sess *mqttSession) processSub(c *client, subject, sid []byte, isReserved b } for _, ss := range subs { if ss.mqtt == nil { - ss.mqtt = &mqttSub{} + // reserved is set only once and once the subscription has been + // created it can be considered immutable. + ss.mqtt = &mqttSub{ + reserved: isReserved, + } } + // QOS and jsDurName can be changed on an existing subscription, so + // accessing it later requires a lock. ss.mqtt.qos = qos - ss.mqtt.reserved = isReserved ss.mqtt.jsDur = jsDurName } + + if len(rms) > 0 { + for _, ss := range subs { + as.serializeRetainedMsgsForSub(rms, sess, c, ss, trace) + } + } + return sub, nil } @@ -2297,12 +2442,6 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client, sess.cons[sid] = cc } - serializeRMS := func(sub *subscription) { - for _, ss := range append([]*subscription{sub}, sub.shadow...) { - as.serializeRetainedMsgsForSub(rms, sess, c, ss, trace) - } - } - var err error subs := make([]*subscription, 0, len(filters)) for _, f := range filters { @@ -2315,21 +2454,28 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client, bsubject := []byte(subject) sid := subject bsid := bsubject + isReserved := isMQTTReservedSubscription(subject) var jscons *ConsumerConfig var jssub *subscription // Note that if a subscription already exists on this subject, the // existing sub is returned. Need to update the qos. + var sub *subscription + var err error + + const processShadowSubs = true + as.mu.Lock() sess.mu.Lock() - sub, err := sess.processSub(c, bsubject, bsid, - isMQTTReservedSubscription(subject), f.qos, _EMPTY_, mqttDeliverMsgCbQoS0, true) - if err == nil && fromSubProto { - serializeRMS(sub) - } + sub, err = sess.processSub(c, + bsubject, bsid, isReserved, f.qos, // main subject + _EMPTY_, mqttDeliverMsgCbQoS0, // no jsDur for QOS0 + processShadowSubs, + rms, trace, as) // rms is empty if not fromSubProto sess.mu.Unlock() as.mu.Unlock() + if err != nil { f.qos = mqttSubAckFailure sess.cleanupFailedSub(c, sub, jscons, jssub) @@ -2356,11 +2502,11 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client, // See note above about existing subscription. as.mu.Lock() sess.mu.Lock() - fwcsub, err = sess.processSub(c, []byte(fwcsubject), []byte(fwcsid), - isMQTTReservedSubscription(subject), f.qos, _EMPTY_, mqttDeliverMsgCbQoS0, true) - if err == nil && fromSubProto { - serializeRMS(fwcsub) - } + fwcsub, err = sess.processSub(c, + []byte(fwcsubject), []byte(fwcsid), isReserved, f.qos, // FWC (top-level wildcard) subject + _EMPTY_, mqttDeliverMsgCbQoS0, // no jsDur for QOS0 + processShadowSubs, + rms, trace, as) // rms is empty if not fromSubProto sess.mu.Unlock() as.mu.Unlock() if err != nil { @@ -2411,14 +2557,13 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(rms map[string] return } for _, psub := range result.psubs { - rm, ok := rms[string(psub.subject)] - if !ok { + + rm := rms[string(psub.subject)] + if rm == nil { + // This should not happen since we pre-load messages into the cache + // before calling serialize. continue } - if sub.mqtt.prm == nil { - sub.mqtt.prm = &mqttWriter{} - } - prm := sub.mqtt.prm var pi uint16 qos := mqttGetQoS(rm.Flags) if qos > sub.mqtt.qos { @@ -2441,7 +2586,10 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(rms map[string] // Need to use the subject for the retained message, not the `sub` subject. // We can find the published retained message in rm.sub.subject. // Set the RETAIN flag: [MQTT-3.3.1-8]. - flags := mqttSerializePublishMsg(prm, pi, qos, false, true, []byte(rm.Topic), rm.Msg) + flags, headerBytes := mqttMakePublishHeader(pi, qos, false, true, []byte(rm.Topic), len(rm.Msg)) + c.mu.Lock() + sub.mqtt.prm = append(sub.mqtt.prm, headerBytes, rm.Msg) + c.mu.Unlock() if trace { pp := mqttPublish{ topic: []byte(rm.Topic), @@ -2475,6 +2623,16 @@ func (as *mqttAccountSessionManager) loadRetainedMessagesForSubject(rms map[stri if rms[subject] != nil { continue // already loaded } + + // See if we have the retained message in the cache. + if rmv, _ := as.rmsCache.Load(subject); rmv != nil { + rm := rmv.(mqttRetainedMsg) + rms[subject] = &rm + continue + } + + // Load the retained message from the stream, and cache it for reuse in + // the near future. loadSubject := mqttRetainedMsgsStreamSubject + subject jsm, err := as.jsa.loadLastMsgFor(mqttRetainedMsgsStreamName, loadSubject) if err != nil || jsm == nil { @@ -2486,6 +2644,10 @@ func (as *mqttAccountSessionManager) loadRetainedMessagesForSubject(rms map[stri log.Warnf("failed to decode retained message for subject %q: %v", loadSubject, err) continue } + + // Add the loaded retained message to the cache. + rm.expiresFromCache = time.Now().Add(mqttRetainedCacheTTL) + as.rmsCache.Store(subject, rm) rms[subject] = &rm } } @@ -2616,55 +2778,59 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve retry = false } -func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) bool { +func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) error { jsa := &as.jsa - var count, errors int + var processed int + var transferred int + start := time.Now() + deadline := start.Add(mqttRetainedTransferTimeout) for { // Try and look up messages on the original undivided "$MQTT.rmsgs" subject. // If nothing is returned here, we assume to have migrated all old messages. smsg, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs") - if err != nil { - if IsNatsErr(err, JSNoMessageFoundErr) { - // We've ran out of messages to transfer so give up. - break - } - log.Warnf(" Unable to load retained message from '$MQTT.rmsgs': %s", err) - errors++ + if IsNatsErr(err, JSNoMessageFoundErr) { + // We've ran out of messages to transfer, done. break } + if err != nil { + log.Warnf(" Unable to transfer a retained message: failed to load from '$MQTT.rmsgs': %s", err) + return err + } + // Unmarshal the message so that we can obtain the subject name. var rmsg mqttRetainedMsg - if err := json.Unmarshal(smsg.Data, &rmsg); err != nil { + if err = json.Unmarshal(smsg.Data, &rmsg); err == nil { + // Store the message again, this time with the new per-key subject. + subject := mqttRetainedMsgsStreamSubject + rmsg.Subject + if _, err = jsa.storeMsg(subject, 0, smsg.Data); err != nil { + log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err) + } + transferred++ + } else { log.Warnf(" Unable to unmarshal retained message with sequence %d, skipping", smsg.Sequence) - errors++ - continue } - // Store the message again, this time with the new per-key subject. - subject := mqttRetainedMsgsStreamSubject + rmsg.Subject - if _, err := jsa.storeMsg(subject, 0, smsg.Data); err != nil { - log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err) - errors++ - continue - } // Delete the original message. if err := jsa.deleteMsg(mqttRetainedMsgsStreamName, smsg.Sequence, true); err != nil { log.Errorf(" Unable to clean up the retained message with sequence %d: %v", smsg.Sequence, err) - errors++ - continue + return err + } + processed++ + + now := time.Now() + if now.After(deadline) { + err := fmt.Errorf("timed out while transferring retained messages from '$MQTT.rmsgs' after %v, %d processed, %d successfully transferred", now.Sub(start), processed, transferred) + log.Noticef(err.Error()) + return err } - count++ } - if errors > 0 { - next := mqttDefaultTransferRetry - log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next) - time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) }) - } else if count > 0 { - log.Noticef("Transfer of %d MQTT retained messages done!", count) + if processed > 0 { + log.Noticef("Processed %d messages from '$MQTT.rmsgs', successfully transferred %d in %v", processed, transferred, time.Since(start)) + } else { + log.Debugf("No messages found to transfer from '$MQTT.rmsgs'") } - // Signal if there was any activity (either some transferred or some errors) - return errors > 0 || count > 0 + return nil } ////////////////////////////////////////////////////////////////////////////// @@ -3848,7 +4014,7 @@ func (c *client) mqttHandlePubRetain() { Origin: asm.jsa.id, Subject: key, Topic: string(pp.topic), - Msg: copyBytes(pp.msg), + Msg: pp.msg, Flags: pp.flags, Source: c.opts.Username, } @@ -3860,7 +4026,7 @@ func (c *client) mqttHandlePubRetain() { sseq: smr.Sequence, } // Add/update the map - asm.handleRetainedMsg(key, rf) + asm.handleRetainedMsg(key, rf, rm) } else { c.mu.Lock() acc := c.acc @@ -4016,28 +4182,6 @@ func pubAllowed(perms *perm, subject string) bool { return allowed } -func mqttWritePublish(w *mqttWriter, qos byte, dup, retain bool, subject string, pi uint16, payload []byte) { - flags := qos << 1 - if dup { - flags |= mqttPubFlagDup - } - if retain { - flags |= mqttPubFlagRetain - } - - w.WriteByte(mqttPacketPub | flags) - pkLen := 2 + len(subject) + len(payload) - if qos > 0 { - pkLen += 2 - } - w.WriteVarInt(pkLen) - w.WriteString(subject) - if qos > 0 { - w.WriteUint16(pi) - } - w.Write([]byte(payload)) -} - func (c *client) mqttEnqueuePubResponse(packetType byte, pi uint16, trace bool) { proto := [4]byte{packetType, 0x2, 0, 0} proto[2] = byte(pi >> 8) @@ -4271,32 +4415,32 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re return } - hdr, msg := pc.msgParts(rmsg) - // This is the client associated with the subscription. cc := sub.client // This is immutable sess := cc.mqtt.sess - // Check the subscription's QoS. This needs to be protected because - // the client may change an existing subscription at any time. - sess.mu.Lock() + // Lock here, otherwise we may be called with sub.mqtt == nil. Ignore + // wildcard subscriptions if this subject starts with '$', per Spec + // [MQTT-4.7.2-1]. + sess.subsMu.RLock() subQoS := sub.mqtt.qos - isReservedSub := mqttIsReservedSub(sub, subject) - sess.mu.Unlock() + ignore := mqttMustIgnoreForReservedSub(sub, subject) + sess.subsMu.RUnlock() - // We have a wildcard subscription and this subject starts with '$' so ignore per Spec [MQTT-4.7.2-1]. - if isReservedSub { + if ignore { return } + hdr, msg := pc.msgParts(rmsg) var topic []byte if pc.isMqtt() { // This is an MQTT publisher directly connected to this server. - // If the message was published with a QoS > 0 and the sub has the QoS > - // 0 then the message will be delivered by the other callback. + // Check the subscription's QoS. If the message was published with a + // QoS>0 and the sub has the QoS>0 then the message will be delivered by + // mqttDeliverMsgCbQoS12. msgQoS := mqttGetQoS(pc.mqtt.pp.flags) if subQoS > 0 && msgQoS > 0 { return @@ -4310,8 +4454,13 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re } else { // Non MQTT client, could be NATS publisher, or ROUTER, etc.. h := mqttParsePublishNATSHeader(hdr) + + // If the message does not have the MQTT header, it is not a MQTT and + // should be delivered here, at QOS0. If it does have the header, we + // need to lock the session to check the sub QoS, and then ignore the + // message if the Sub wants higher QOS delivery. It will be delivered by + // mqttDeliverMsgCbQoS12. if subQoS > 0 && h != nil && h.qos > 0 { - // will be delivered by the JetStream callback return } @@ -4356,8 +4505,9 @@ func mqttDeliverMsgCbQoS12(sub *subscription, pc *client, _ *Account, subject, r // This is immutable sess := cc.mqtt.sess - // We lock to check some of the subscription's fields and if we need to - // keep track of pending acks, etc.. + // We lock to check some of the subscription's fields and if we need to keep + // track of pending acks, etc. There is no need to acquire the subsMu RLock + // since sess.Lock is overarching for modifying subscriptions. sess.mu.Lock() if sess.c != cc || sub.mqtt == nil { sess.mu.Unlock() @@ -4382,8 +4532,7 @@ func mqttDeliverMsgCbQoS12(sub *subscription, pc *client, _ *Account, subject, r // Check for reserved subject violation. If so, we will send the ack to // remove the message, and do nothing else. strippedSubj := string(subject[len(mqttStreamSubjectPrefix):]) - isReservedSub := mqttIsReservedSub(sub, strippedSubj) - if isReservedSub { + if mqttMustIgnoreForReservedSub(sub, strippedSubj) { sess.mu.Unlock() sess.jsa.sendAck(reply) return @@ -4432,12 +4581,12 @@ func mqttDeliverPubRelCb(sub *subscription, pc *client, _ *Account, subject, rep cc.mqttEnqueuePubResponse(mqttPacketPubRel, pi, trace) } -// The MQTT Server MUST NOT match Topic Filters starting with a wildcard character (# or +) -// with Topic Names beginning with a $ character, Spec [MQTT-4.7.2-1]. -// We will return true if there is a violation. +// The MQTT Server MUST NOT match Topic Filters starting with a wildcard +// character (# or +) with Topic Names beginning with a $ character, Spec +// [MQTT-4.7.2-1]. We will return true if there is a violation. // -// Session lock must be held on entry to protect access to sub.mqtt.reserved. -func mqttIsReservedSub(sub *subscription, subject string) bool { +// Session or subMu lock must be held on entry to protect access to sub.mqtt. +func mqttMustIgnoreForReservedSub(sub *subscription, subject string) bool { // If the subject does not start with $ nothing to do here. if !sub.mqtt.reserved || len(subject) == 0 || subject[0] != mqttReservedPre { return false @@ -4460,17 +4609,17 @@ func isMQTTReservedSubscription(subject string) bool { // Common function to mqtt delivery callbacks to serialize and send the message // to the `cc` client. func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup bool, topic, msg []byte) { - sw := mqttWriter{} - w := &sw - - flags := mqttSerializePublishMsg(w, pi, qos, dup, false, topic, msg) + flags, headerBytes := mqttMakePublishHeader(pi, qos, dup, false, topic, len(msg)) cc.mu.Lock() if sub.mqtt.prm != nil { - cc.queueOutbound(sub.mqtt.prm.Bytes()) + for _, data := range sub.mqtt.prm { + cc.queueOutbound(data) + } sub.mqtt.prm = nil } - cc.queueOutbound(w.Bytes()) + cc.queueOutbound(headerBytes) + cc.queueOutbound(msg) c.addToPCD(cc) trace := cc.trace cc.mu.Unlock() @@ -4487,10 +4636,9 @@ func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint1 } // Serializes to the given writer the message for the given subject. -func mqttSerializePublishMsg(w *mqttWriter, pi uint16, qos byte, dup, retained bool, topic, msg []byte) byte { +func (w *mqttWriter) WritePublishHeader(pi uint16, qos byte, dup, retained bool, topic []byte, msgLen int) byte { // Compute len (will have to add packet id if message is sent as QoS>=1) - pkLen := 2 + len(topic) + len(msg) - + pkLen := 2 + len(topic) + msgLen var flags byte // Set flags for dup/retained/qos1 @@ -4500,8 +4648,7 @@ func mqttSerializePublishMsg(w *mqttWriter, pi uint16, qos byte, dup, retained b if retained { flags |= mqttPubFlagRetain } - // For now, we have only QoS 1 - if pi > 0 { + if qos > 0 { pkLen += 2 flags |= qos << 1 } @@ -4509,14 +4656,20 @@ func mqttSerializePublishMsg(w *mqttWriter, pi uint16, qos byte, dup, retained b w.WriteByte(mqttPacketPub | flags) w.WriteVarInt(pkLen) w.WriteBytes(topic) - if pi > 0 { + if qos > 0 { w.WriteUint16(pi) } - w.Write(msg) return flags } +// Serializes to the given writer the message for the given subject. +func mqttMakePublishHeader(pi uint16, qos byte, dup, retained bool, topic []byte, msgLen int) (byte, []byte) { + headerBuf := newMQTTWriter(mqttInitialPubHeader + len(topic)) + flags := headerBuf.WritePublishHeader(pi, qos, dup, retained, topic, msgLen) + return flags, headerBuf.Bytes() +} + // Process the SUBSCRIBE packet. // // Process the list of subscriptions and update the given filter @@ -4741,7 +4894,8 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string, sess.mu.Lock() sess.tmaxack = tmaxack sub, err := sess.processSub(c, []byte(inbox), []byte(inbox), - isMQTTReservedSubscription(subject), qos, cc.Durable, mqttDeliverMsgCbQoS12, false) + isMQTTReservedSubscription(subject), qos, cc.Durable, mqttDeliverMsgCbQoS12, + false, nil, false, nil) // no shadow subs, no retained message delivery sess.mu.Unlock() if err != nil { @@ -4758,7 +4912,9 @@ func (c *client) mqttSendRetainedMsgsToNewSubs(subs []*subscription) { c.mu.Lock() for _, sub := range subs { if sub.mqtt != nil && sub.mqtt.prm != nil { - c.queueOutbound(sub.mqtt.prm.Bytes()) + for _, data := range sub.mqtt.prm { + c.queueOutbound(data) + } sub.mqtt.prm = nil } } @@ -4767,7 +4923,7 @@ func (c *client) mqttSendRetainedMsgsToNewSubs(subs []*subscription) { } func (c *client) mqttEnqueueSubAck(pi uint16, filters []*mqttFilter) { - w := &mqttWriter{} + w := newMQTTWriter(7 + len(filters)) w.WriteByte(mqttPacketSubAck) // packet length is 2 (for packet identifier) and 1 byte per filter. w.WriteVarInt(2 + len(filters)) @@ -4849,7 +5005,7 @@ func (c *client) mqttProcessUnsubs(filters []*mqttFilter) error { } func (c *client) mqttEnqueueUnsubAck(pi uint16) { - w := &mqttWriter{} + w := newMQTTWriter(4) w.WriteByte(mqttPacketUnsubAck) w.WriteVarInt(2) w.WriteUint16(pi) @@ -5190,3 +5346,9 @@ func (w *mqttWriter) WriteVarInt(value int) { } } } + +func newMQTTWriter(cap int) *mqttWriter { + w := &mqttWriter{} + w.Grow(cap) + return w +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/vendor/github.com/nats-io/nats-server/v2/server/raft.go index 87bd00f94e..83ee6f04f7 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -3190,7 +3190,13 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { var success bool if eae, _ := n.loadEntry(ae.pindex); eae == nil { - n.resetWAL() + // If terms are equal, and we are not catching up, we have simply already processed this message. + // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. + if ae.pterm == n.pterm && !catchingUp { + success = true + } else { + n.resetWAL() + } } else { // If terms mismatched, or we got an error loading, delete that entry and all others past it. // Make sure to cancel any catchups in progress. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/route.go b/vendor/github.com/nats-io/nats-server/v2/server/route.go index 4cada7c8dd..e73f1bdf72 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/route.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/route.go @@ -146,27 +146,25 @@ func (c *client) removeReplySub(sub *subscription) { // Lookup the account based on sub.sid. if i := bytes.Index(sub.sid, []byte(" ")); i > 0 { // First part of SID for route is account name. - if v, ok := c.srv.accounts.Load(string(sub.sid[:i])); ok { + if v, ok := c.srv.accounts.Load(bytesToString(sub.sid[:i])); ok { (v.(*Account)).sl.Remove(sub) } c.mu.Lock() - delete(c.subs, string(sub.sid)) + delete(c.subs, bytesToString(sub.sid)) c.mu.Unlock() } } func (c *client) processAccountSub(arg []byte) error { - accName := string(arg) if c.kind == GATEWAY { - return c.processGatewayAccountSub(accName) + return c.processGatewayAccountSub(string(arg)) } return nil } func (c *client) processAccountUnsub(arg []byte) { - accName := string(arg) if c.kind == GATEWAY { - c.processGatewayAccountUnsub(accName) + c.processGatewayAccountUnsub(string(arg)) } } @@ -705,7 +703,7 @@ func (c *client) processRouteInfo(info *Info) { // First INFO, check if this server is configured for compression because // if that is the case, we need to negotiate it with the remote server. if needsCompression(opts.Cluster.Compression.Mode) { - accName := string(c.route.accName) + accName := bytesToString(c.route.accName) // If we did not yet negotiate... if !c.flags.isSet(compressionNegotiated) { // Prevent from getting back here. @@ -935,7 +933,7 @@ func (s *Server) updateRemoteRoutePerms(c *client, info *Info) { c.opts.Import = info.Import c.opts.Export = info.Export - routeAcc, poolIdx, noPool := string(c.route.accName), c.route.poolIdx, c.route.noPool + routeAcc, poolIdx, noPool := bytesToString(c.route.accName), c.route.poolIdx, c.route.noPool c.mu.Unlock() var ( @@ -1290,9 +1288,9 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { // RS- will have the arg exactly as the key. var key string if c.kind == ROUTER && c.route != nil && len(c.route.accName) > 0 { - key = accountName + " " + string(arg) + key = accountName + " " + bytesToString(arg) } else { - key = string(arg) + key = bytesToString(arg) } sub, ok := c.subs[key] if ok { @@ -1367,7 +1365,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { // If the account name is empty (not a "per-account" route), the account // is at the index prior to the subject. if accountName == _EMPTY_ { - accountName = string(args[subjIdx-1]) + accountName = bytesToString(args[subjIdx-1]) } // Lookup account while avoiding fetch. // A slow fetch delays subsequent remote messages. It also avoids the expired check (see below). @@ -1413,7 +1411,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { } // Check permissions if applicable. - if !c.canExport(string(sub.subject)) { + if c.perms != nil && !c.canExport(string(sub.subject)) { c.mu.Unlock() c.Debugf("Can not export %q, ignoring remote subscription request", sub.subject) return nil @@ -1451,7 +1449,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { sub.sid = append(sub.sid, ' ') sub.sid = append(sub.sid, sub.subject...) } - key := string(sub.sid) + key := bytesToString(sub.sid) acc.mu.RLock() // For routes (this can be called by leafnodes), check if the account is @@ -2232,6 +2230,9 @@ func handleDuplicateRoute(remote, c *client, setNoReconnect bool) { // Import filter check. func (c *client) importFilter(sub *subscription) bool { + if c.perms == nil { + return true + } return c.canImport(string(sub.subject)) } @@ -2854,7 +2855,7 @@ func (s *Server) removeRoute(c *client) { idHash = r.idHash gwURL = r.gatewayURL poolIdx = r.poolIdx - accName = string(r.accName) + accName = bytesToString(r.accName) if r.noPool { s.routesNoPool-- noPool = true diff --git a/vendor/github.com/nats-io/nats-server/v2/server/store.go b/vendor/github.com/nats-io/nats-server/v2/server/store.go index 3482abc8a9..7b433c7f53 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/store.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/store.go @@ -21,6 +21,7 @@ import ( "io" "strings" "time" + "unsafe" "github.com/nats-io/nats-server/v2/server/avl" ) @@ -714,3 +715,23 @@ func (sm *StoreMsg) clear() { sm.buf = sm.buf[:0] } } + +// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer. +// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same. +func bytesToString(b []byte) string { + if len(b) == 0 { + return _EMPTY_ + } + p := unsafe.SliceData(b) + return unsafe.String(p, len(b)) +} + +// Same in reverse. Used less often. +func stringToBytes(s string) []byte { + if len(s) == 0 { + return nil + } + p := unsafe.StringData(s) + b := unsafe.Slice(p, len(s)) + return b +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index 118e8a95ed..4bdd4868e5 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -5190,6 +5190,11 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) { mset.clsMu.Lock() o.mu.Lock() + if o.closed || o.mset == nil { + o.mu.Unlock() + return + } + if o.sigSubs != nil { if mset.csl != nil { for _, sub := range o.sigSubs { @@ -5206,13 +5211,13 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) { // If no filters are preset, add fwcs to sublist for that consumer. if newFilters == nil { sub := &subscription{subject: []byte(fwcs), icb: o.processStreamSignal} - o.mset.csl.Insert(sub) + mset.csl.Insert(sub) o.sigSubs = append(o.sigSubs, sub) // If there are filters, add their subjects to sublist. } else { for _, filter := range newFilters { sub := &subscription{subject: []byte(filter), icb: o.processStreamSignal} - o.mset.csl.Insert(sub) + mset.csl.Insert(sub) o.sigSubs = append(o.sigSubs, sub) } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/websocket.go b/vendor/github.com/nats-io/nats-server/v2/server/websocket.go index 0f45f91e33..e026674d9f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/websocket.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/websocket.go @@ -1269,13 +1269,12 @@ func (s *Server) createWSClient(conn net.Conn, ws *websocket) *client { } func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { - var nb net.Buffers + nb := c.out.nb var mfs int var usz int if c.ws.browser { mfs = wsFrameSizeForBrowsers } - nb = c.out.nb mask := c.ws.maskwrite // Start with possible already framed buffers (that we could have // got from partials or control messages such as ws pings or pongs). @@ -1378,8 +1377,10 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { for i := 0; i < len(nb); i++ { b := nb[i] if total+len(b) <= mfs { - bufs = append(bufs, b) + buf := nbPoolGet(len(b)) + bufs = append(bufs, append(buf, b...)) total += len(b) + nbPoolPut(nb[i]) continue } for len(b) > 0 { @@ -1394,9 +1395,11 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if endStart { fhIdx = startFrame() } - bufs = append(bufs, b[:total]) + buf := nbPoolGet(total) + bufs = append(bufs, append(buf, b[:total]...)) b = b[total:] } + nbPoolPut(nb[i]) // No longer needed as copied into smaller frames. } if total > 0 { endFrame(fhIdx, total) diff --git a/vendor/modules.txt b/vendor/modules.txt index ec8477410a..7cb0b27535 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1219,8 +1219,8 @@ github.com/justinas/alice # github.com/kevinburke/ssh_config v1.2.0 ## explicit github.com/kevinburke/ssh_config -# github.com/klauspost/compress v1.17.2 -## explicit; go 1.18 +# github.com/klauspost/compress v1.17.4 +## explicit; go 1.19 github.com/klauspost/compress/flate github.com/klauspost/compress/s2 # github.com/klauspost/cpuid/v2 v2.1.0 @@ -1376,7 +1376,7 @@ github.com/mschoch/smat # github.com/nats-io/jwt/v2 v2.5.3 ## explicit; go 1.18 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.10.5 +# github.com/nats-io/nats-server/v2 v2.10.7 ## explicit; go 1.20 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/ldap