diff --git a/go.mod b/go.mod index 4f78e427d9..2537f1cffc 100644 --- a/go.mod +++ b/go.mod @@ -59,7 +59,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.2.1 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.10.9 + github.com/nats-io/nats-server/v2 v2.10.10 github.com/nats-io/nats.go v1.32.0 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 @@ -249,7 +249,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/juliangruber/go-intersect v1.1.0 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/compress v1.17.4 // indirect + github.com/klauspost/compress v1.17.5 // indirect github.com/klauspost/cpuid/v2 v2.1.0 // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/libregraph/oidc-go v1.0.0 // indirect diff --git a/go.sum b/go.sum index 49e67dddf3..be0d8686f8 100644 --- a/go.sum +++ b/go.sum @@ -1588,8 +1588,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= -github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.5 h1:d4vBd+7CHydUqpFBgUEKkSdtSugf9YFmSkvUYPquI5E= +github.com/klauspost/compress v1.17.5/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -1745,8 +1745,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= -github.com/nats-io/nats-server/v2 v2.10.9 h1:VEW43Zz+p+9lARtiPM9ctd6ckun+92ZT2T17HWtwiFI= -github.com/nats-io/nats-server/v2 v2.10.9/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ= +github.com/nats-io/nats-server/v2 v2.10.10 h1:g1Wd64J5SGsoqWSx1qoNu9/At7a2x+jE7Qtf2XpEx/I= +github.com/nats-io/nats-server/v2 v2.10.10/go.mod h1:/TE61Dos8NlwZnjzyE3ZlOnM6dgl7tf937dnf4VclrA= github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0= github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= diff --git a/vendor/github.com/klauspost/compress/flate/deflate.go b/vendor/github.com/klauspost/compress/flate/deflate.go index de912e187c..66d1657d2c 100644 --- a/vendor/github.com/klauspost/compress/flate/deflate.go +++ b/vendor/github.com/klauspost/compress/flate/deflate.go @@ -212,7 +212,7 @@ func (d *compressor) writeBlockSkip(tok *tokens, index int, eof bool) error { // Should only be used after a start/reset. func (d *compressor) fillWindow(b []byte) { // Do not fill window if we are in store-only or huffman mode. - if d.level <= 0 { + if d.level <= 0 && d.level > -MinCustomWindowSize { return } if d.fast != nil { diff --git a/vendor/github.com/klauspost/compress/internal/race/norace.go b/vendor/github.com/klauspost/compress/internal/race/norace.go new file mode 100644 index 0000000000..affbbbb595 --- /dev/null +++ b/vendor/github.com/klauspost/compress/internal/race/norace.go @@ -0,0 +1,13 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !race + +package race + +func ReadSlice[T any](s []T) { +} + +func WriteSlice[T any](s []T) { +} diff --git a/vendor/github.com/klauspost/compress/internal/race/race.go b/vendor/github.com/klauspost/compress/internal/race/race.go new file mode 100644 index 0000000000..f5e240dcde --- /dev/null +++ b/vendor/github.com/klauspost/compress/internal/race/race.go @@ -0,0 +1,26 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build race + +package race + +import ( + "runtime" + "unsafe" +) + +func ReadSlice[T any](s []T) { + if len(s) == 0 { + return + } + runtime.RaceReadRange(unsafe.Pointer(&s[0]), len(s)*int(unsafe.Sizeof(s[0]))) +} + +func WriteSlice[T any](s []T) { + if len(s) == 0 { + return + } + runtime.RaceWriteRange(unsafe.Pointer(&s[0]), len(s)*int(unsafe.Sizeof(s[0]))) +} diff --git a/vendor/github.com/klauspost/compress/s2/decode.go b/vendor/github.com/klauspost/compress/s2/decode.go index 6c7feafcc6..264ffd0a9b 100644 --- a/vendor/github.com/klauspost/compress/s2/decode.go +++ b/vendor/github.com/klauspost/compress/s2/decode.go @@ -10,6 +10,8 @@ import ( "errors" "fmt" "strconv" + + "github.com/klauspost/compress/internal/race" ) var ( @@ -63,6 +65,10 @@ func Decode(dst, src []byte) ([]byte, error) { } else { dst = make([]byte, dLen) } + + race.WriteSlice(dst) + race.ReadSlice(src[s:]) + if s2Decode(dst, src[s:]) != 0 { return nil, ErrCorrupt } diff --git a/vendor/github.com/klauspost/compress/s2/encode_amd64.go b/vendor/github.com/klauspost/compress/s2/encode_amd64.go index ebc332ad5f..4f45206a4e 100644 --- a/vendor/github.com/klauspost/compress/s2/encode_amd64.go +++ b/vendor/github.com/klauspost/compress/s2/encode_amd64.go @@ -3,6 +3,8 @@ package s2 +import "github.com/klauspost/compress/internal/race" + const hasAmd64Asm = true // encodeBlock encodes a non-empty src to a guaranteed-large-enough dst. It @@ -14,6 +16,9 @@ const hasAmd64Asm = true // len(dst) >= MaxEncodedLen(len(src)) && // minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize func encodeBlock(dst, src []byte) (d int) { + race.ReadSlice(src) + race.WriteSlice(dst) + const ( // Use 12 bit table when less than... limit12B = 16 << 10 @@ -50,6 +55,9 @@ func encodeBlock(dst, src []byte) (d int) { // len(dst) >= MaxEncodedLen(len(src)) && // minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize func encodeBlockBetter(dst, src []byte) (d int) { + race.ReadSlice(src) + race.WriteSlice(dst) + const ( // Use 12 bit table when less than... limit12B = 16 << 10 @@ -86,6 +94,9 @@ func encodeBlockBetter(dst, src []byte) (d int) { // len(dst) >= MaxEncodedLen(len(src)) && // minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize func encodeBlockSnappy(dst, src []byte) (d int) { + race.ReadSlice(src) + race.WriteSlice(dst) + const ( // Use 12 bit table when less than... limit12B = 16 << 10 @@ -121,6 +132,9 @@ func encodeBlockSnappy(dst, src []byte) (d int) { // len(dst) >= MaxEncodedLen(len(src)) && // minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize func encodeBlockBetterSnappy(dst, src []byte) (d int) { + race.ReadSlice(src) + race.WriteSlice(dst) + const ( // Use 12 bit table when less than... limit12B = 16 << 10 diff --git a/vendor/github.com/klauspost/compress/s2/reader.go b/vendor/github.com/klauspost/compress/s2/reader.go index 2f01a3987f..46ead58fe0 100644 --- a/vendor/github.com/klauspost/compress/s2/reader.go +++ b/vendor/github.com/klauspost/compress/s2/reader.go @@ -104,12 +104,14 @@ func ReaderIgnoreStreamIdentifier() ReaderOption { // For each chunk with the ID, the callback is called with the content. // Any returned non-nil error will abort decompression. // Only one callback per ID is supported, latest sent will be used. +// You can peek the stream, triggering the callback, by doing a Read with a 0 +// byte buffer. func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption { return func(r *Reader) error { if id < 0x80 || id > 0xfd { return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)") } - r.skippableCB[id] = fn + r.skippableCB[id-0x80] = fn return nil } } @@ -128,7 +130,7 @@ type Reader struct { err error decoded []byte buf []byte - skippableCB [0x80]func(r io.Reader) error + skippableCB [0xff - 0x80]func(r io.Reader) error blockStart int64 // Uncompressed offset at start of current. index *Index @@ -201,7 +203,7 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { // The supplied slice does not need to be the size of the read. func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) { if id < 0x80 { - r.err = fmt.Errorf("interbal error: skippable id < 0x80") + r.err = fmt.Errorf("internal error: skippable id < 0x80") return false } if fn := r.skippableCB[id-0x80]; fn != nil { @@ -1048,15 +1050,17 @@ func (r *Reader) ReadByte() (byte, error) { } // SkippableCB will register a callback for chunks with the specified ID. -// ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive). +// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive). // For each chunk with the ID, the callback is called with the content. // Any returned non-nil error will abort decompression. // Only one callback per ID is supported, latest sent will be used. // Sending a nil function will disable previous callbacks. +// You can peek the stream, triggering the callback, by doing a Read with a 0 +// byte buffer. func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error { - if id < 0x80 || id > chunkTypePadding { + if id < 0x80 || id >= chunkTypePadding { return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)") } - r.skippableCB[id] = fn + r.skippableCB[id-0x80] = fn return nil } diff --git a/vendor/github.com/klauspost/compress/s2/s2.go b/vendor/github.com/klauspost/compress/s2/s2.go index dae3f731fa..72bcb49453 100644 --- a/vendor/github.com/klauspost/compress/s2/s2.go +++ b/vendor/github.com/klauspost/compress/s2/s2.go @@ -37,6 +37,8 @@ package s2 import ( "bytes" "hash/crc32" + + "github.com/klauspost/compress/internal/race" ) /* @@ -112,6 +114,8 @@ var crcTable = crc32.MakeTable(crc32.Castagnoli) // crc implements the checksum specified in section 3 of // https://github.com/google/snappy/blob/master/framing_format.txt func crc(b []byte) uint32 { + race.ReadSlice(b) + c := crc32.Update(0, crcTable, b) return c>>15 | c<<17 + 0xa282ead8 } diff --git a/vendor/github.com/klauspost/compress/s2/writer.go b/vendor/github.com/klauspost/compress/s2/writer.go index 089cd36d8c..bba66a8766 100644 --- a/vendor/github.com/klauspost/compress/s2/writer.go +++ b/vendor/github.com/klauspost/compress/s2/writer.go @@ -13,6 +13,8 @@ import ( "io" "runtime" "sync" + + "github.com/klauspost/compress/internal/race" ) const ( @@ -271,7 +273,7 @@ func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) { return fmt.Errorf("skippable block excessed maximum size") } var header [4]byte - chunkLen := 4 + len(data) + chunkLen := len(data) header[0] = id header[1] = uint8(chunkLen >> 0) header[2] = uint8(chunkLen >> 8) @@ -282,7 +284,7 @@ func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) { if err = w.err(err); err != nil { return err } - if n != len(data) { + if n != len(b) { return w.err(io.ErrShortWrite) } w.written += int64(n) @@ -303,9 +305,7 @@ func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) { if err := write(header[:]); err != nil { return err } - if err := write(data); err != nil { - return err - } + return write(data) } // Create output... @@ -385,6 +385,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { buf = buf[len(uncompressed):] // Get an output buffer. obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen] + race.WriteSlice(obuf) + output := make(chan result) // Queue output now, so we keep order. w.output <- output @@ -393,6 +395,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { } w.uncompWritten += int64(len(uncompressed)) go func() { + race.ReadSlice(uncompressed) + checksum := crc(uncompressed) // Set to uncompressed. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go index b45cec6f91..216bf6623d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go @@ -1877,6 +1877,10 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im rt := Singleton var lat *serviceLatency + if dest == nil { + return nil, ErrMissingAccount + } + dest.mu.RLock() se := dest.getServiceExport(to) if se != nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/auth_callout.go b/vendor/github.com/nats-io/nats-server/v2/server/auth_callout.go index 359914c633..6620a4e512 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/auth_callout.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/auth_callout.go @@ -230,7 +230,6 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize } } } - return targetAcc, nil } @@ -270,6 +269,14 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize return } + // the JWT is cleared, because if in operator mode it may hold the JWT + // for the bearer token that connected to the callout if in operator mode + // the permissions are already set on the client, this prevents a decode + // on c.RegisterNKeyUser which would have wrong values + c.mu.Lock() + c.opts.JWT = _EMPTY_ + c.mu.Unlock() + // Build internal user and bind to the targeted account. nkuser := buildInternalNkeyUser(arc, allowedConnTypes, targetAcc) if err := c.RegisterNkeyUser(nkuser); err != nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go index 4ef424cbb0..348c0478fe 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -4089,6 +4089,12 @@ func getHeader(key string, hdr []byte) []byte { return value } +// For bytes.HasPrefix below. +var ( + jsRequestNextPreB = []byte(jsRequestNextPre) + jsDirectGetPreB = []byte(jsDirectGetPre) +) + // processServiceImport is an internal callback when a subscription matches an imported service // from another account. This includes response mappings as well. func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) { @@ -4110,8 +4116,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt var checkJS bool shouldReturn := si.invalid || acc.sl == nil if !shouldReturn && !isResponse && si.to == jsAllAPI { - subj := bytesToString(c.pa.subject) - if strings.HasPrefix(subj, jsRequestNextPre) || strings.HasPrefix(subj, jsDirectGetPre) { + if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) { checkJS = true } } @@ -4790,7 +4795,11 @@ func (c *client) processPingTimer() { var sendPing bool - pingInterval := c.srv.getOpts().PingInterval + opts := c.srv.getOpts() + pingInterval := opts.PingInterval + if c.kind == ROUTER && opts.Cluster.PingInterval > 0 { + pingInterval = opts.Cluster.PingInterval + } pingInterval = adjustPingInterval(c.kind, pingInterval) now := time.Now() needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL @@ -4810,7 +4819,11 @@ func (c *client) processPingTimer() { if sendPing { // Check for violation - if c.ping.out+1 > c.srv.getOpts().MaxPingsOut { + maxPingsOut := opts.MaxPingsOut + if c.kind == ROUTER && opts.Cluster.MaxPingsOut > 0 { + maxPingsOut = opts.Cluster.MaxPingsOut + } + if c.ping.out+1 > maxPingsOut { c.Debugf("Stale Client Connection - Closing") c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection"))) c.mu.Unlock() @@ -4847,7 +4860,11 @@ func (c *client) setPingTimer() { if c.srv == nil { return } - d := c.srv.getOpts().PingInterval + opts := c.srv.getOpts() + d := opts.PingInterval + if c.kind == ROUTER && opts.Cluster.PingInterval > 0 { + d = opts.Cluster.PingInterval + } d = adjustPingInterval(c.kind, d) c.ping.tmr = time.AfterFunc(d, c.processPingTimer) } @@ -5740,6 +5757,9 @@ func (c *client) setFirstPingTimer() { opts := s.getOpts() d := opts.PingInterval + if c.kind == ROUTER && opts.Cluster.PingInterval > 0 { + d = opts.Cluster.PingInterval + } if !opts.DisableShortFirstPing { if c.kind != CLIENT { if d > firstPingInterval { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index dea5c9e9b7..238a2e5df9 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.10.9" + VERSION = "2.10.10" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index 02a8443e7c..a9ad0ebc41 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -4384,8 +4384,9 @@ func (o *consumer) trackPending(sseq, dseq uint64) { o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending) } if p, ok := o.pending[sseq]; ok { + // Update timestamp but keep original consumer delivery sequence. + // So do not update p.Sequence. p.Timestamp = time.Now().UnixNano() - p.Sequence = dseq } else { o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()} } @@ -4606,6 +4607,8 @@ func (o *consumer) checkPending() { o.rdq = nil o.rdqi.Empty() o.pending = nil + // Mimic behavior in processAckMsg when pending is empty. + o.adflr, o.asflr = o.dseq-1, o.sseq-1 } // Update our state if needed. @@ -4936,6 +4939,7 @@ func (o *consumer) purge(sseq uint64, slseq uint64) { // This means we can reset everything at this point. if len(o.pending) == 0 { o.pending, o.rdc = nil, nil + o.adflr, o.asflr = o.dseq-1, o.sseq-1 } // We need to remove all those being queued for redelivery under o.rdq @@ -5330,33 +5334,42 @@ func (o *consumer) isMonitorRunning() bool { return o.inMonitor } +// If we detect that our ackfloor is higher than the stream's last sequence, return this error. +var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence") + // If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent. -func (o *consumer) checkStateForInterestStream() { - o.mu.Lock() +func (o *consumer) checkStateForInterestStream() error { + o.mu.RLock() // See if we need to process this update if our parent stream is not a limits policy stream. mset := o.mset shouldProcessState := mset != nil && o.retention != LimitsPolicy if o.closed || !shouldProcessState { - o.mu.Unlock() - return + o.mu.RUnlock() + return nil } state, err := o.store.State() - o.mu.Unlock() + o.mu.RUnlock() if err != nil { - return + return err } asflr := state.AckFloor.Stream // Protect ourselves against rolling backwards. if asflr&(1<<63) != 0 { - return + return nil } // We should make sure to update the acks. var ss StreamState mset.store.FastState(&ss) + // Check if the underlying stream's last sequence is less than our floor. + // This can happen if the stream has been reset and has not caught up yet. + if asflr > ss.LastSeq { + return errAckFloorHigherThanLastSeq + } + for seq := ss.FirstSeq; seq <= asflr; seq++ { mset.ackMsg(o, seq) } @@ -5374,4 +5387,5 @@ func (o *consumer) checkStateForInterestStream() { } } } + return nil } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/events.go b/vendor/github.com/nats-io/nats-server/v2/server/events.go index 66d744d451..391e677cad 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/events.go @@ -1625,7 +1625,6 @@ func (s *Server) shutdownEventing() { // internal send loop to exit. s.sendShutdownEvent() wg.Wait() - close(rc) s.mu.Lock() defer s.mu.Unlock() @@ -1637,6 +1636,9 @@ func (s *Server) shutdownEventing() { }) // Turn everything off here. s.sys = nil + // Make sure this is done after s.sys = nil, so that we don't + // get sends to closed channels on badly-timed config reloads. + close(rc) } // Request for our local connection count. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index 8b73d353de..861fe4dc9f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -40,6 +40,7 @@ import ( "github.com/klauspost/compress/s2" "github.com/minio/highwayhash" "github.com/nats-io/nats-server/v2/server/avl" + "github.com/nats-io/nats-server/v2/server/stree" "golang.org/x/crypto/chacha20" "golang.org/x/crypto/chacha20poly1305" ) @@ -176,13 +177,14 @@ type fileStore struct { lmb *msgBlock blks []*msgBlock bim map[uint32]*msgBlock - psim map[string]*psi + psim *stree.SubjectTree[psi] tsl int adml int hh hash.Hash64 qch chan struct{} fch chan struct{} fsld chan struct{} + cmu sync.RWMutex cfs []ConsumerStore sips int dirty int @@ -382,7 +384,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim fs := &fileStore{ fcfg: fcfg, - psim: make(map[string]*psi), + psim: stree.NewSubjectTree[psi](), bim: make(map[uint32]*msgBlock), cfg: FileStreamInfo{Created: created, StreamConfig: cfg}, prf: prf, @@ -428,7 +430,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim prior := fs.state // Reset anything that could have been set from above. fs.state = StreamState{} - fs.psim, fs.tsl = make(map[string]*psi), 0 + fs.psim, fs.tsl = fs.psim.Empty(), 0 fs.bim = make(map[uint32]*msgBlock) fs.blks = nil fs.tombs = nil @@ -744,7 +746,10 @@ func (fs *fileStore) setupAEK() error { if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } - if err := os.WriteFile(keyFile, encrypted, defaultFilePerms); err != nil { + <-dios + err = os.WriteFile(keyFile, encrypted, defaultFilePerms) + dios <- struct{}{} + if err != nil { return err } // Set our aek. @@ -775,14 +780,20 @@ func (fs *fileStore) writeStreamMeta() error { b = fs.aek.Seal(nonce, nonce, b, nil) } - if err := os.WriteFile(meta, b, defaultFilePerms); err != nil { + <-dios + err = os.WriteFile(meta, b, defaultFilePerms) + dios <- struct{}{} + if err != nil { return err } fs.hh.Reset() fs.hh.Write(b) checksum := hex.EncodeToString(fs.hh.Sum(nil)) sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum) - if err := os.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil { + <-dios + err = os.WriteFile(sum, []byte(checksum), defaultFilePerms) + dios <- struct{}{} + if err != nil { return err } return nil @@ -847,7 +858,7 @@ const ( // Lock should be held. func (fs *fileStore) noTrackSubjects() bool { - return !(len(fs.psim) > 0 || len(fs.cfg.Subjects) > 0 || fs.cfg.Mirror != nil || len(fs.cfg.Sources) > 0) + return !(fs.psim.Size() > 0 || len(fs.cfg.Subjects) > 0 || fs.cfg.Mirror != nil || len(fs.cfg.Sources) > 0) } // Will init the basics for a message block. @@ -950,7 +961,7 @@ func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { // Open up the message file, but we will try to recover from the index file. // We will check that the last checksums match. - file, err := os.Open(mb.mfn) + file, err := mb.openBlock() if err != nil { return nil, err } @@ -1174,11 +1185,16 @@ func (mb *msgBlock) convertCipher() error { // the old keyfile back. if err := fs.genEncryptionKeysForBlock(mb); err != nil { keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)) + <-dios os.WriteFile(keyFile, ekey, defaultFilePerms) + dios <- struct{}{} return err } mb.bek.XORKeyStream(buf, buf) - if err := os.WriteFile(mb.mfn, buf, defaultFilePerms); err != nil { + <-dios + err = os.WriteFile(mb.mfn, buf, defaultFilePerms) + dios <- struct{}{} + if err != nil { return err } return nil @@ -1203,7 +1219,10 @@ func (mb *msgBlock) convertToEncrypted() error { // Undo cache from above for later. mb.cache = nil mb.bek.XORKeyStream(buf, buf) - if err := os.WriteFile(mb.mfn, buf, defaultFilePerms); err != nil { + <-dios + err = os.WriteFile(mb.mfn, buf, defaultFilePerms) + dios <- struct{}{} + if err != nil { return err } return nil @@ -1285,7 +1304,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { if mb.mfd != nil { fd = mb.mfd } else { + <-dios fd, err = os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms) + dios <- struct{}{} if err == nil { defer fd.Close() } @@ -1577,7 +1598,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { // Check for per subject info. if numSubjects := int(readU64()); numSubjects > 0 { - fs.psim, fs.tsl = make(map[string]*psi, numSubjects), 0 + fs.psim, fs.tsl = fs.psim.Empty(), 0 for i := 0; i < numSubjects; i++ { if lsubj := int(readU64()); lsubj > 0 { if bi+lsubj > len(buf) { @@ -1588,23 +1609,23 @@ func (fs *fileStore) recoverFullState() (rerr error) { // If we have lots of subjects this will alloc for each one. // We could reference the underlying buffer, but we could guess wrong if // number of blocks is large and subjects is low, since we would reference buf. - subj := string(buf[bi : bi+lsubj]) + subj := buf[bi : bi+lsubj] // We had a bug that could cause memory corruption in the PSIM that could have gotten stored to disk. // Only would affect subjects, so do quick check. - if !isValidSubject(subj, true) { + if !isValidSubject(string(subj), true) { os.Remove(fn) fs.warn("Stream state corrupt subject detected") return errCorruptState } bi += lsubj - psi := &psi{total: readU64(), fblk: uint32(readU64())} + psi := psi{total: readU64(), fblk: uint32(readU64())} if psi.total > 1 { psi.lblk = uint32(readU64()) } else { psi.lblk = psi.fblk } - fs.psim[subj] = psi - fs.tsl += len(subj) + fs.psim.Insert(subj, psi) + fs.tsl += lsubj } } } @@ -1769,13 +1790,13 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) { fs.state.Msgs++ fs.state.Bytes += fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) if len(sm.subj) > 0 && fs.psim != nil { - if info, ok := fs.psim[sm.subj]; ok { + if info, ok := fs.psim.Find(stringToBytes(sm.subj)); ok { info.total++ if nmb.index > info.lblk { info.lblk = nmb.index } } else { - fs.psim[sm.subj] = &psi{total: 1, fblk: nmb.index, lblk: nmb.index} + fs.psim.Insert(stringToBytes(sm.subj), psi{total: 1, fblk: nmb.index, lblk: nmb.index}) fs.tsl += len(sm.subj) } } @@ -1801,7 +1822,7 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) { // Grabs last checksum for the named block file. // Takes into account encryption etc. func (mb *msgBlock) lastChecksum() []byte { - f, err := os.Open(mb.mfn) + f, err := mb.openBlock() if err != nil { return nil } @@ -1843,7 +1864,9 @@ func (fs *fileStore) cleanupOldMeta() { mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) fs.mu.RUnlock() + <-dios f, err := os.Open(mdir) + dios <- struct{}{} if err != nil { return } @@ -2142,7 +2165,7 @@ func (fs *fileStore) expireMsgsOnRecover() { lmb.writeTombstone(last.seq, last.ts) } // Clear any global subject state. - fs.psim, fs.tsl = make(map[string]*psi), 0 + fs.psim, fs.tsl = fs.psim.Empty(), 0 } // If we purged anything, make sure we kick flush state loop. @@ -2414,6 +2437,9 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { // If past the end no results. if sseq > lseq { + // Make sure we track sequences + ss.First = fs.state.FirstSeq + ss.Last = fs.state.LastSeq return ss } @@ -2458,55 +2484,42 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) { return } - tsa := [32]string{} - fsa := [32]string{} - fts := tokenizeSubjectIntoSlice(fsa[:0], filter) - start, stop := uint32(math.MaxUint32), uint32(0) - for subj, psi := range fs.psim { - if isAll { - ss.Msgs += psi.total - } else { - tts := tokenizeSubjectIntoSlice(tsa[:0], subj) - if isSubsetMatchTokenized(tts, fts) { - ss.Msgs += psi.total - // Keep track of start and stop indexes for this subject. - if psi.fblk < start { - start = psi.fblk - } - if psi.lblk > stop { - stop = psi.lblk - } + fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) { + ss.Msgs += psi.total + // Keep track of start and stop indexes for this subject. + if psi.fblk < start { + start = psi.fblk + } + if psi.lblk > stop { + stop = psi.lblk + } + }) + // We do need to figure out the first and last sequences. + wc := subjectHasWildcard(filter) + // Do start + mb := fs.bim[start] + if mb != nil { + _, f, _ := mb.filteredPending(filter, wc, 0) + ss.First = f + } + if ss.First == 0 { + // This is a miss. This can happen since psi.fblk is lazy, but should be very rare. + for i := start + 1; i <= stop; i++ { + mb := fs.bim[i] + if mb == nil { + continue + } + if _, f, _ := mb.filteredPending(filter, wc, 0); f > 0 { + ss.First = f + break } } } - // If not collecting all we do need to figure out the first and last sequences. - if !isAll { - wc := subjectHasWildcard(filter) - // Do start - mb := fs.bim[start] - if mb != nil { - _, f, _ := mb.filteredPending(filter, wc, 0) - ss.First = f - } - if ss.First == 0 { - // This is a miss. This can happen since psi.fblk is lazy, but should be very rare. - for i := start + 1; i <= stop; i++ { - mb := fs.bim[i] - if mb == nil { - continue - } - if _, f, _ := mb.filteredPending(filter, wc, 0); f > 0 { - ss.First = f - break - } - } - } - // Now last - if mb = fs.bim[stop]; mb != nil { - _, _, l := mb.filteredPending(filter, wc, 0) - ss.Last = l - } + // Now last + if mb = fs.bim[stop]; mb != nil { + _, _, l := mb.filteredPending(filter, wc, 0) + ss.Last = l } } @@ -2522,8 +2535,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { start, stop := fs.blks[0], fs.lmb // We can short circuit if not a wildcard using psim for start and stop. if !subjectHasWildcard(subject) { - info := fs.psim[subject] - if info == nil { + info, ok := fs.psim.Find(stringToBytes(subject)) + if !ok { return nil } start, stop = fs.bim[info.fblk], fs.bim[info.lblk] @@ -2604,10 +2617,14 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) wc := subjectHasWildcard(filter) // See if filter was provided but its the only subject. - if !isAll && !wc && len(fs.psim) == 1 && fs.psim[filter] != nil { - isAll = true + if !isAll && !wc && fs.psim.Size() == 1 { + if _, ok := fs.psim.Find(stringToBytes(filter)); ok { + isAll = true + } + } + if isAll && filter == _EMPTY_ { + filter = fwcs } - // If we are isAll and have no deleted we can do a simpler calculation. if !lastPerSubject && isAll && (fs.state.LastSeq-fs.state.FirstSeq+1) == fs.state.Msgs { if sseq == 0 { @@ -2638,7 +2655,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) if lastPerSubject { // If we want all and our start sequence is equal or less than first return number of subjects. if isAll && sseq <= fs.state.FirstSeq { - return uint64(len(fs.psim)), validThrough + return uint64(fs.psim.Size()), validThrough } // If we are here we need to scan. We are going to scan the PSIM looking for lblks that are >= seqStart. // This will build up a list of all subjects from the selected block onward. @@ -2646,20 +2663,19 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb := fs.blks[seqStart] bi := mb.index - for subj, psi := range fs.psim { + fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) { // If the select blk start is greater than entry's last blk skip. if bi > psi.lblk { - continue + return } - if isMatch(subj) { - total++ - // We will track the subjects that are an exact match to the last block. - // This is needed for last block processing. - if psi.lblk == bi { - lbm[subj] = true - } + total++ + // We will track the subjects that are an exact match to the last block. + // This is needed for last block processing. + if psi.lblk == bi { + lbm[string(subj)] = true } - } + }) + // Now check if we need to inspect the seqStart block. // Grab write lock in case we need to load in msgs. mb.mu.Lock() @@ -2768,15 +2784,13 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) // If we are here it's better to calculate totals from psim and adjust downward by scanning less blocks. // TODO(dlc) - Eventually when sublist uses generics, make this sublist driven instead. start := uint32(math.MaxUint32) - for subj, psi := range fs.psim { - if isMatch(subj) { - total += psi.total - // Keep track of start index for this subject. - if psi.fblk < start { - start = psi.fblk - } + fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) { + total += psi.total + // Keep track of start index for this subject. + if psi.fblk < start { + start = psi.fblk } - } + }) // See if we were asked for all, if so we are done. if sseq <= fs.state.FirstSeq { return total, validThrough @@ -2860,30 +2874,14 @@ func (fs *fileStore) SubjectsTotals(filter string) map[string]uint64 { fs.mu.RLock() defer fs.mu.RUnlock() - if len(fs.psim) == 0 { + if fs.psim.Size() == 0 { return nil } - tsa := [32]string{} - fsa := [32]string{} - fts := tokenizeSubjectIntoSlice(fsa[:0], filter) - isAll := filter == _EMPTY_ || filter == fwcs - wc := subjectHasWildcard(filter) - - isMatch := func(subj string) bool { - if !wc { - return subj == filter - } - tts := tokenizeSubjectIntoSlice(tsa[:0], subj) - return isSubsetMatchTokenized(tts, fts) - } - fst := make(map[string]uint64) - for subj, psi := range fs.psim { - if isAll || isMatch(subj) { - fst[subj] = psi.total - } - } + fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) { + fst[string(subj)] = psi.total + }) return fst } @@ -2970,7 +2968,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } mb.hh = hh + <-dios mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms) + dios <- struct{}{} + if err != nil { mb.dirtyCloseWithRemove(true) return nil, fmt.Errorf("Error creating msg block file: %v", err) @@ -3014,7 +3015,10 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error { if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } - if err := os.WriteFile(keyFile, encrypted, defaultFilePerms); err != nil { + <-dios + err = os.WriteFile(keyFile, encrypted, defaultFilePerms) + dios <- struct{}{} + if err != nil { return err } mb.kfn = keyFile @@ -3033,7 +3037,7 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in var psmc uint64 psmax := mmp > 0 && len(subj) > 0 if psmax { - if info, ok := fs.psim[subj]; ok { + if info, ok := fs.psim.Find(stringToBytes(subj)); ok { psmc = info.total } } @@ -3079,13 +3083,13 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in // Adjust top level tracking of per subject msg counts. if len(subj) > 0 && fs.psim != nil { index := fs.lmb.index - if info, ok := fs.psim[subj]; ok { + if info, ok := fs.psim.Find(stringToBytes(subj)); ok { info.total++ if index > info.lblk { info.lblk = index } } else { - fs.psim[subj] = &psi{total: 1, fblk: index, lblk: index} + fs.psim.Insert(stringToBytes(subj), psi{total: 1, fblk: index, lblk: index}) fs.tsl += len(subj) } } @@ -3112,7 +3116,8 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in if ok, _ := fs.removeMsgViaLimits(fseq); ok { // Make sure we are below the limit. if psmc--; psmc >= mmp { - for info, ok := fs.psim[subj]; ok && info.total > mmp; info, ok = fs.psim[subj] { + bsubj := stringToBytes(subj) + for info, ok := fs.psim.Find(bsubj); ok && info.total > mmp; info, ok = fs.psim.Find(bsubj) { if seq, _ := fs.firstSeqForSubj(subj); seq > 0 { if ok, _ := fs.removeMsgViaLimits(seq); !ok { break @@ -3362,7 +3367,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { // See if we can optimize where we start. start, stop := fs.blks[0].index, fs.lmb.index - if info, ok := fs.psim[subj]; ok { + if info, ok := fs.psim.Find(stringToBytes(subj)); ok { start, stop = info.fblk, info.lblk } @@ -3384,7 +3389,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { if ss := mb.fss[subj]; ss != nil { // Adjust first if it was not where we thought it should be. if i != start { - if info, ok := fs.psim[subj]; ok { + if info, ok := fs.psim.Find(stringToBytes(subj)); ok { info.fblk = i } } @@ -3457,19 +3462,20 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { // collect all that are not correct. needAttention := make(map[string]*psi) - for subj, psi := range fs.psim { + fs.psim.Iter(func(subj []byte, psi *psi) bool { numMsgs += psi.total if psi.total > maxMsgsPer { - needAttention[subj] = psi + needAttention[string(subj)] = psi } - } + return true + }) // We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught. // So do a quick sanity check here. If we detect a skew do a rebuild then re-check. if numMsgs != fs.state.Msgs { fs.warn("Detected skew in subject-based total (%d) vs raw total (%d), rebuilding", numMsgs, fs.state.Msgs) // Clear any global subject state. - fs.psim, fs.tsl = make(map[string]*psi), 0 + fs.psim, fs.tsl = fs.psim.Empty(), 0 for _, mb := range fs.blks { ld, _, err := mb.rebuildState() if err != nil && ld != nil { @@ -3481,11 +3487,12 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { fs.rebuildStateLocked(nil) // Need to redo blocks that need attention. needAttention = make(map[string]*psi) - for subj, psi := range fs.psim { + fs.psim.Iter(func(subj []byte, psi *psi) bool { if psi.total > maxMsgsPer { - needAttention[subj] = psi + needAttention[string(subj)] = psi } - } + return true + }) } // Collect all the msgBlks we alter. @@ -3569,13 +3576,15 @@ func (fs *fileStore) removePerSubject(subj string) { return } // We do not update sense of fblk here but will do so when we resolve during lookup. - if info, ok := fs.psim[subj]; ok { + bsubj := stringToBytes(subj) + if info, ok := fs.psim.Find(bsubj); ok { info.total-- if info.total == 1 { info.fblk = info.lblk } else if info.total == 0 { - delete(fs.psim, subj) - fs.tsl -= len(subj) + if _, ok = fs.psim.Delete(bsubj); ok { + fs.tsl -= len(subj) + } } } } @@ -3902,7 +3911,10 @@ func (mb *msgBlock) compact() { // We will write to a new file and mv/rename it in case of failure. mfn := filepath.Join(mb.fs.fcfg.StoreDir, msgDir, fmt.Sprintf(newScan, mb.index)) - if err := os.WriteFile(mfn, nbuf, defaultFilePerms); err != nil { + <-dios + err := os.WriteFile(mfn, nbuf, defaultFilePerms) + dios <- struct{}{} + if err != nil { os.Remove(mfn) return } @@ -3943,11 +3955,24 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { // Determine record length var rl uint32 - if len(mb.cache.idx) > slot+1 { - ni := mb.cache.idx[slot+1] &^ hbit - rl = ni - ri - } else { + if slot >= len(mb.cache.idx) { rl = mb.cache.lrl + } else { + // Need to account for dbit markers in idx. + // So we will walk until we find valid idx slot to calculate rl. + for i := 1; slot+i < len(mb.cache.idx); i++ { + ni := mb.cache.idx[slot+i] &^ hbit + if ni == dbit { + continue + } + rl = ni - ri + break + } + // check if we had all trailing dbits. + // If so use len of cache buf minus ri. + if rl == 0 { + rl = uint32(len(mb.cache.buf)) - ri + } } if rl < msgHdrSize { return 0, 0, false, errBadMsg @@ -4092,7 +4117,9 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error { // Disk if mb.cache.off+mb.cache.wp > ri { + <-dios mfd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms) + dios <- struct{}{} if err != nil { return err } @@ -4523,7 +4550,7 @@ func (fs *fileStore) checkMsgs() *LostStreamData { fs.checkAndFlushAllBlocks() // Clear any global subject state. - fs.psim, fs.tsl = make(map[string]*psi), 0 + fs.psim, fs.tsl = fs.psim.Empty(), 0 for _, mb := range fs.blks { // Make sure encryption loaded if needed for the block. @@ -4547,7 +4574,9 @@ func (mb *msgBlock) enableForWriting(fip bool) error { if mb.mfd != nil { return nil } + <-dios mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms) + dios <- struct{}{} if err != nil { return fmt.Errorf("error opening msg block file [%q]: %v", mb.mfn, err) } @@ -4817,11 +4846,6 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg } func (mb *msgBlock) recompressOnDiskIfNeeded() error { - // Wait for disk I/O slots to become available. This prevents us from - // running away with system resources. - <-dios - defer func() { dios <- struct{}{} }() - alg := mb.fs.fcfg.Compression mb.mu.Lock() defer mb.mu.Unlock() @@ -4835,7 +4859,10 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // header, in which case we do nothing. // 2. The block will be uncompressed, in which case we will compress it // and then write it back out to disk, reencrypting if necessary. + <-dios origBuf, err := os.ReadFile(origFN) + dios <- struct{}{} + if err != nil { return fmt.Errorf("failed to read original block from disk: %w", err) } @@ -4880,7 +4907,9 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // operation if something goes wrong), create a new temporary file. We will // write out the new block here and then swap the files around afterwards // once everything else has succeeded correctly. + <-dios tmpFD, err := os.OpenFile(tmpFN, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, defaultFilePerms) + dios <- struct{}{} if err != nil { return fmt.Errorf("failed to create temporary file: %w", err) } @@ -4970,7 +4999,7 @@ func (mb *msgBlock) ensureRawBytesLoaded() error { if mb.rbytes > 0 { return nil } - f, err := os.Open(mb.mfn) + f, err := mb.openBlock() if err != nil { return err } @@ -5042,7 +5071,11 @@ func (fs *fileStore) syncBlocks() { // Check if we need to sync. // This is done not holding any locks. if needSync { - if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil { + <-dios + fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms) + dios <- struct{}{} + // If we have an fd. + if fd != nil { canClear := fd.Sync() == nil fd.Close() // Only clear sync flag on success. @@ -5070,7 +5103,10 @@ func (fs *fileStore) syncBlocks() { // Sync state file if we are not running with sync always. if !syncAlways { - if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil { + <-dios + fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms) + dios <- struct{}{} + if fd != nil { fd.Sync() fd.Close() } @@ -5157,10 +5193,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { var index uint32 mbFirstSeq := atomic.LoadUint64(&mb.first.seq) + mbLastSeq := atomic.LoadUint64(&mb.last.seq) // Capture beginning size of dmap. dms := uint64(mb.dmap.Size()) - idxSz := atomic.LoadUint64(&mb.last.seq) - mbFirstSeq + 1 + idxSz := mbLastSeq - mbFirstSeq + 1 if mb.cache == nil { // Approximation, may adjust below. @@ -5185,12 +5222,14 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { } lbuf := uint32(len(buf)) + var seq uint64 for index < lbuf { if index+msgHdrSize > lbuf { return errCorruptState } hdr := buf[index : index+msgHdrSize] - rl, seq, slen := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]), int(le.Uint16(hdr[20:])) + rl, slen := le.Uint32(hdr[0:]), int(le.Uint16(hdr[20:])) + seq = le.Uint64(hdr[4:]) // Clear any headers bit that could be set. rl &^= hbit @@ -5257,6 +5296,19 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { index += rl } + // Track holes at the end of the block, these would be missed in the + // earlier loop if we've ran out of block file to look at, but should + // be easily noticed because the seq will be below the last seq from + // the index. + if seq > 0 && seq < mbLastSeq { + for dseq := seq; dseq < mbLastSeq; dseq++ { + idx = append(idx, dbit) + if dms == 0 { + mb.dmap.Insert(dseq) + } + } + } + mb.cache.buf = buf mb.cache.idx = idx mb.cache.fseq = fseq @@ -5442,6 +5494,16 @@ func (mb *msgBlock) fssLoaded() bool { return mb.fss != nil } +// Wrap openBlock for the gated semaphore processing. +// Lock should be held +func (mb *msgBlock) openBlock() (*os.File, error) { + // Gate with concurrent IO semaphore. + <-dios + f, err := os.Open(mb.mfn) + dios <- struct{}{} + return f, err +} + // Used to load in the block contents. // Lock should be held and all conditionals satisfied prior. func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) { @@ -5456,7 +5518,7 @@ func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) { } if f == nil { var err error - f, err = os.Open(mb.mfn) + f, err = mb.openBlock() if err != nil { if os.IsNotExist(err) { err = errNoBlkData @@ -5665,7 +5727,7 @@ const ( ebit = 1 << 63 // Used for marking tombstone sequences. tbit = 1 << 62 - // Used to mark a bad index as deleted. + // Used to mark an index as deleted and non-existent. dbit = 1 << 30 ) @@ -5890,7 +5952,7 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err wc := subjectHasWildcard(subj) // If literal subject check for presence. if !wc { - if info := fs.psim[subj]; info == nil { + if info, ok := fs.psim.Find(stringToBytes(subj)); !ok { return nil, ErrStoreMsgNotFound } else { start, stop = info.lblk, info.fblk @@ -5960,6 +6022,16 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store start = fs.state.FirstSeq } + // If start is less than or equal to beginning of our stream, meaning our first call, + // let's check the psim to see if we can skip ahead. + if start <= fs.state.FirstSeq { + var ss SimpleState + fs.numFilteredPending(filter, &ss) + if ss.First > start { + start = ss.First + } + } + if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 { for i := bi; i < len(fs.blks); i++ { mb := fs.blks[i] @@ -5987,7 +6059,14 @@ func (fs *fileStore) Type() StorageType { // Returns number of subjects in this store. // Lock should be held. func (fs *fileStore) numSubjects() int { - return len(fs.psim) + return fs.psim.Size() +} + +// numConsumers uses new lock. +func (fs *fileStore) numConsumers() int { + fs.cmu.RLock() + defer fs.cmu.RUnlock() + return len(fs.cfs) } // FastState will fill in state with only the following. @@ -6006,7 +6085,7 @@ func (fs *fileStore) FastState(state *StreamState) { state.NumDeleted = 0 } } - state.Consumers = len(fs.cfs) + state.Consumers = fs.numConsumers() state.NumSubjects = fs.numSubjects() fs.mu.RUnlock() } @@ -6015,7 +6094,7 @@ func (fs *fileStore) FastState(state *StreamState) { func (fs *fileStore) State() StreamState { fs.mu.RLock() state := fs.state - state.Consumers = len(fs.cfs) + state.Consumers = fs.numConsumers() state.NumSubjects = fs.numSubjects() state.Deleted = nil // make sure. @@ -6443,7 +6522,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { fs.lmb = nil fs.bim = make(map[uint32]*msgBlock) // Clear any per subject tracking. - fs.psim, fs.tsl = make(map[string]*psi), 0 + fs.psim, fs.tsl = fs.psim.Empty(), 0 // Mark dirty fs.dirty++ @@ -6454,14 +6533,25 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { // If purge directory still exists then we need to wait // in place and remove since rename would fail. if _, err := os.Stat(pdir); err == nil { + <-dios os.RemoveAll(pdir) + dios <- struct{}{} } - os.Rename(mdir, pdir) - go os.RemoveAll(pdir) + <-dios + os.Rename(mdir, pdir) + dios <- struct{}{} + + go func() { + <-dios + os.RemoveAll(pdir) + dios <- struct{}{} + }() // Create new one. + <-dios os.MkdirAll(mdir, defaultDirPerms) + dios <- struct{}{} // Make sure we have a lmb to write to. if _, err := fs.newMsgBlockForWrite(); err != nil { @@ -6629,7 +6719,10 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { if nbuf, err = smb.cmp.Compress(nbuf); err != nil { goto SKIP } - if err = os.WriteFile(smb.mfn, nbuf, defaultFilePerms); err != nil { + <-dios + err = os.WriteFile(smb.mfn, nbuf, defaultFilePerms) + dios <- struct{}{} + if err != nil { goto SKIP } // Make sure to remove fss state. @@ -6718,7 +6811,7 @@ func (fs *fileStore) reset() error { fs.blks, fs.lmb = nil, nil // Reset subject mappings. - fs.psim, fs.tsl = make(map[string]*psi), 0 + fs.psim, fs.tsl = fs.psim.Empty(), 0 fs.bim = make(map[uint32]*msgBlock) // If we purged anything, make sure we kick flush state loop. @@ -7005,7 +7098,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si // Lock should be held. func (fs *fileStore) resetGlobalPerSubjectInfo() { // Clear any global subject state. - fs.psim, fs.tsl = make(map[string]*psi), 0 + fs.psim, fs.tsl = fs.psim.Empty(), 0 for _, mb := range fs.blks { fs.populateGlobalPerSubjectInfo(mb) } @@ -7096,13 +7189,14 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) { // Now populate psim. for subj, ss := range mb.fss { if len(subj) > 0 { - if info, ok := fs.psim[subj]; ok { + bsubj := stringToBytes(subj) + if info, ok := fs.psim.Find(bsubj); ok { info.total += ss.Msgs if mb.index > info.lblk { info.lblk = mb.index } } else { - fs.psim[subj] = &psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index} + fs.psim.Insert(bsubj, psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index}) fs.tsl += len(subj) } } @@ -7312,7 +7406,7 @@ func (fs *fileStore) writeFullState() error { } // For calculating size. - numSubjects := len(fs.psim) + numSubjects := fs.psim.Size() // Calculate and estimate of the uper bound on the size to avoid multiple allocations. sz := 2 + // Magic and Version @@ -7324,11 +7418,12 @@ func (fs *fileStore) writeFullState() error { binary.MaxVarintLen64 + 8 + 8 // last index + record checksum + full state checksum // Do 4k on stack if possible. - var raw [4 * 1024]byte + const ssz = 4 * 1024 var buf []byte - if sz <= cap(raw) { - buf, sz = raw[0:2:cap(raw)], cap(raw) + if sz <= ssz { + var _buf [ssz]byte + buf, sz = _buf[0:2:ssz], ssz } else { buf = make([]byte, hdrLen, sz) } @@ -7344,7 +7439,7 @@ func (fs *fileStore) writeFullState() error { // Do per subject information map if applicable. buf = binary.AppendUvarint(buf, uint64(numSubjects)) if numSubjects > 0 { - for subj, psi := range fs.psim { + fs.psim.Match([]byte(fwcs), func(subj []byte, psi *psi) { buf = binary.AppendUvarint(buf, uint64(len(subj))) buf = append(buf, subj...) buf = binary.AppendUvarint(buf, psi.total) @@ -7352,7 +7447,7 @@ func (fs *fileStore) writeFullState() error { if psi.total > 1 { buf = binary.AppendUvarint(buf, uint64(psi.lblk)) } - } + }) } // Now walk all blocks and write out first and last and optional dmap encoding. @@ -7521,11 +7616,13 @@ func (fs *fileStore) stop(writeState bool) error { // We should update the upper usage layer on a stop. cb, bytes := fs.scb, int64(fs.state.Bytes) + fs.mu.Unlock() + fs.cmu.Lock() var _cfs [256]ConsumerStore cfs := append(_cfs[:0], fs.cfs...) fs.cfs = nil - fs.mu.Unlock() + fs.cmu.Unlock() for _, o := range cfs { o.Stop() @@ -7673,9 +7770,9 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ } // Do consumers' state last. - fs.mu.RLock() + fs.cmu.RLock() cfs := fs.cfs - fs.mu.RUnlock() + fs.cmu.RUnlock() for _, cs := range cfs { o, ok := cs.(*consumerFileStore) @@ -8018,7 +8115,10 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt // Redo the state file as well here if we have one and we can tell it was plaintext. if buf, err := os.ReadFile(o.ifn); err == nil { if _, err := decodeConsumerState(buf); err == nil { - if err := os.WriteFile(o.ifn, o.encryptState(buf), defaultFilePerms); err != nil { + <-dios + err := os.WriteFile(o.ifn, o.encryptState(buf), defaultFilePerms) + dios <- struct{}{} + if err != nil { if didCreate { os.RemoveAll(odir) } @@ -8508,7 +8608,10 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } - if err := os.WriteFile(keyFile, encrypted, defaultFilePerms); err != nil { + <-dios + err = os.WriteFile(keyFile, encrypted, defaultFilePerms) + dios <- struct{}{} + if err != nil { return err } } @@ -8524,14 +8627,21 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { b = cfs.aek.Seal(nonce, nonce, b, nil) } - if err := os.WriteFile(meta, b, defaultFilePerms); err != nil { + <-dios + err = os.WriteFile(meta, b, defaultFilePerms) + dios <- struct{}{} + if err != nil { return err } cfs.hh.Reset() cfs.hh.Write(b) checksum := hex.EncodeToString(cfs.hh.Sum(nil)) sum := filepath.Join(cfs.odir, JetStreamMetaFileSum) - if err := os.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil { + + <-dios + err = os.WriteFile(sum, []byte(checksum), defaultFilePerms) + dios <- struct{}{} + if err != nil { return err } return nil @@ -8617,7 +8727,10 @@ func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, er } // Read the state in here from disk.. + <-dios buf, err := os.ReadFile(o.ifn) + dios <- struct{}{} + if err != nil && !os.IsNotExist(err) { return nil, err } @@ -8883,15 +8996,15 @@ func (o *consumerFileStore) delete(streamDeleted bool) error { } func (fs *fileStore) AddConsumer(o ConsumerStore) error { - fs.mu.Lock() - defer fs.mu.Unlock() + fs.cmu.Lock() + defer fs.cmu.Unlock() fs.cfs = append(fs.cfs, o) return nil } func (fs *fileStore) RemoveConsumer(o ConsumerStore) error { - fs.mu.Lock() - defer fs.mu.Unlock() + fs.cmu.Lock() + defer fs.cmu.Unlock() for i, cfs := range fs.cfs { if o == cfs { fs.cfs = append(fs.cfs[:i], fs.cfs[i+1:]...) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go index 6b333878ab..cba11d14ef 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2023 The NATS Authors +// Copyright 2019-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -653,9 +653,15 @@ func (a *Account) enableAllJetStreamServiceImportsAndMappings() error { } if !a.serviceImportExists(jsAllAPI) { - if err := a.AddServiceImport(s.SystemAccount(), jsAllAPI, _EMPTY_); err != nil { + // Capture si so we can turn on implicit sharing with JetStream layer. + // Make sure to set "to" otherwise will incur performance slow down. + si, err := a.addServiceImport(s.SystemAccount(), jsAllAPI, jsAllAPI, nil) + if err != nil { return fmt.Errorf("Error setting up jetstream service imports for account: %v", err) } + a.mu.Lock() + si.share = true + a.mu.Unlock() } // Check if we have a Domain specified. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go index 76c1c2a7ba..14369d6c0e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go @@ -334,8 +334,8 @@ func generateJSMappingTable(domain string) map[string]string { // JSMaxDescription is the maximum description length for streams and consumers. const JSMaxDescriptionLen = 4 * 1024 -// JSMaxMetadataLen is the maximum length for streams an consumers metadata map. -// It's calculated by summing length of all keys an values. +// JSMaxMetadataLen is the maximum length for streams and consumers metadata map. +// It's calculated by summing length of all keys and values. const JSMaxMetadataLen = 128 * 1024 // JSMaxNameLen is the maximum name lengths for streams, consumers and templates. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 770b9957e7..e7756cd114 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -1,4 +1,4 @@ -// Copyright 2020-2023 The NATS Authors +// Copyright 2020-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -432,10 +432,10 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { } // Restart the stream in question. -// Should only be called when the stream is know in a bad state. +// Should only be called when the stream is known to be in a bad state. func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { js.mu.Lock() - cc := js.cluster + s, cc := js.srv, js.cluster if cc == nil { js.mu.Unlock() return @@ -458,9 +458,18 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { } rg.node = nil } + sinceCreation := time.Since(sa.Created) js.mu.Unlock() // Process stream assignment to recreate. + // Check that we have given system enough time to start us up. + // This will be longer than obvious, and matches consumer logic in case system very busy. + if sinceCreation < 10*time.Second { + s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v", + acc, csa.Config.Name, sinceCreation) + return + } + js.processStreamAssignment(sa) // If we had consumers assigned to this server they will be present in the copy, csa. @@ -569,13 +578,24 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum // When we try to restart we nil out the node if applicable // and reprocess the consumer assignment. restartConsumer := func() { + mset.mu.RLock() + accName, streamName := mset.acc.GetName(), mset.cfg.Name + mset.mu.RUnlock() + js.mu.Lock() + deleted := ca.deleted + // Check that we have not just been created. + if !deleted && time.Since(ca.Created) < 10*time.Second { + s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v", + accName, streamName, consumer, time.Since(ca.Created)) + js.mu.Unlock() + return + } // Make sure the node is stopped if still running. if node != nil && node.State() != Closed { node.Stop() } ca.Group.node = nil - deleted := ca.deleted js.mu.Unlock() if !deleted { js.processConsumerAssignment(ca) @@ -597,7 +617,7 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum mset.mu.RLock() accName, streamName := mset.acc.GetName(), mset.cfg.Name mset.mu.RUnlock() - s.Warnf("Detected consumer cluster node skew '%s > %s'", accName, streamName, consumer) + s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer) node.Delete() o.deleteWithoutAdvisory() restartConsumer() @@ -1123,6 +1143,7 @@ func (js *jetStream) checkForOrphans() { for accName, jsa := range js.accounts { asa := cc.streams[accName] + jsa.mu.RLock() for stream, mset := range jsa.streams { if sa := asa[stream]; sa == nil { streams = append(streams, mset) @@ -1136,6 +1157,7 @@ func (js *jetStream) checkForOrphans() { } } } + jsa.mu.RUnlock() } js.mu.Unlock() @@ -1356,7 +1378,7 @@ func (js *jetStream) monitorCluster() { case isLeader = <-lch: // For meta layer synchronize everyone to our state on becoming leader. - if isLeader { + if isLeader && n.ApplyQ().len() == 0 { n.SendSnapshot(js.metaSnapshot()) } // Process the change. @@ -2233,7 +2255,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil { lastState, lastSnapTime = curState, time.Now() - } else if err != errNoSnapAvailable && err != errNodeClosed { + } else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning { s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } } @@ -2298,7 +2320,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if mset.numConsumers() >= numExpectedConsumers { break } - time.Sleep(sleepTime) + select { + case <-s.quitCh: + return + case <-time.After(sleepTime): + } } if actual := mset.numConsumers(); actual < numExpectedConsumers { s.Warnf("All consumers not online for '%s > %s': expected %d but only have %d", accName, mset.name(), numExpectedConsumers, actual) @@ -2311,7 +2337,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // We can arrive here NOT being the leader, so we send the snapshot only if we are, and in this case // reset the notion that we need to send the snapshot. If we are not, then the first time the server // will switch to leader (in the loop below), we will send the snapshot. - if sendSnapshot && isLeader && mset != nil && n != nil { + if sendSnapshot && isLeader && mset != nil && n != nil && !isRecovering { n.SendSnapshot(mset.stateSnapshot()) sendSnapshot = false } @@ -2331,9 +2357,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // No special processing needed for when we are caught up on restart. if ce == nil { isRecovering = false - // Check on startup if we should snapshot/compact. - if _, b := n.Size(); b > compactSizeMin || n.NeedSnapshot() { - doSnapshot() + // Make sure we create a new snapshot in case things have changed such that any existing + // snapshot may no longer be valid. + doSnapshot() + // If we became leader during this time and we need to send a snapshot to our + // followers, i.e. as a result of a scale-up from R1, do it now. + if sendSnapshot && isLeader && mset != nil && n != nil { + n.SendSnapshot(mset.stateSnapshot()) + sendSnapshot = false } continue } @@ -2374,7 +2405,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps case isLeader = <-lch: if isLeader { - if mset != nil && n != nil && sendSnapshot { + if mset != nil && n != nil && sendSnapshot && !isRecovering { + // If we *are* recovering at the time then this will get done when the apply queue + // handles the nil guard to show the catchup ended. n.SendSnapshot(mset.stateSnapshot()) sendSnapshot = false } @@ -2841,6 +2874,19 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Process the actual message here. if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil { + if err == errLastSeqMismatch { + var state StreamState + mset.store.FastState(&state) + // If we have no msgs and the other side is delivering us a sequence past where we + // should be reset. This is possible if the other side has a stale snapshot and no longer + // has those messages. So compact and retry to reset. + if state.Msgs == 0 { + mset.store.Compact(lseq + 1) + // Retry + err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts) + } + } + // Only return in place if we are going to reset our stream or we are out of space, or we are closed. if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed { return err @@ -4086,6 +4132,7 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { // Make sure this removal is for what we have, otherwise ignore. if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name { needDelete = true + oca.deleted = true delete(sa.consumers, ca.Name) } } @@ -4366,7 +4413,6 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb recovering := ca.recovering js.mu.RUnlock() - stopped := false var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} var err error var acc *Account @@ -4376,9 +4422,13 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb if mset, _ := acc.lookupStream(ca.Stream); mset != nil { if o := mset.lookupConsumer(ca.Name); o != nil { err = o.stopWithFlags(true, false, true, wasLeader) - stopped = true } } + } else if ca.Group != nil { + // We have a missing account, see if we can cleanup. + if sacc := s.SystemAccount(); sacc != nil { + os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name)) + } } // Always delete the node if present. @@ -4386,15 +4436,6 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb node.Delete() } - // This is a stop gap cleanup in case - // 1) the account does not exist (and mset consumer couldn't be stopped) and/or - // 2) node was nil (and couldn't be deleted) - if !stopped || node == nil { - if sacc := s.SystemAccount(); sacc != nil { - os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name)) - } - } - if !wasLeader || ca.Reply == _EMPTY_ { if !(offline && isMetaLeader) { return @@ -4595,8 +4636,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else if err != errNoSnapAvailable && err != errNodeClosed { - s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err) + } else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning { + s.RateLimitWarnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err) } } } @@ -4658,16 +4699,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { js.setConsumerAssignmentRecovering(ca) } - // Synchronize everyone to our state. - if isLeader && n != nil { - // Only send out if we have state. - if _, _, applied := n.Progress(); applied > 0 { - if snap, err := o.store.EncodedState(); err == nil { - n.SendSnapshot(snap) - } - } - } - // Process the change. if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { doSnapshot(true) @@ -7789,6 +7820,7 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) { mset.store.FastState(&state) mset.setCLFS(snap.Failed) sreq := mset.calculateSyncRequest(&state, snap) + s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name) mset.mu.Unlock() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go index ab7c61915e..7ea14ef858 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go @@ -18,7 +18,7 @@ import ( "time" ) -func (s *Server) publishAdvisory(acc *Account, subject string, adv interface{}) { +func (s *Server) publishAdvisory(acc *Account, subject string, adv any) { if acc == nil { acc = s.SystemAccount() if acc == nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index 8d324d01f7..6abf0f005e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -22,6 +22,7 @@ import ( "time" "github.com/nats-io/nats-server/v2/server/avl" + "github.com/nats-io/nats-server/v2/server/stree" ) // TODO(dlc) - This is a fairly simplistic approach but should do for now. @@ -30,7 +31,7 @@ type memStore struct { cfg StreamConfig state StreamState msgs map[uint64]*StoreMsg - fss map[string]*SimpleState + fss *stree.SubjectTree[SimpleState] dmap avl.SequenceSet maxp int64 scb StorageUpdateHandler @@ -48,7 +49,7 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) { } ms := &memStore{ msgs: make(map[uint64]*StoreMsg), - fss: make(map[string]*SimpleState), + fss: stree.NewSubjectTree[SimpleState](), maxp: cfg.MaxMsgsPer, cfg: *cfg, } @@ -88,11 +89,12 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { // If the value is smaller we need to enforce that. if ms.maxp != 0 && ms.maxp < maxp { lm := uint64(ms.maxp) - for subj, ss := range ms.fss { + ms.fss.Iter(func(subj []byte, ss *SimpleState) bool { if ss.Msgs > lm { - ms.enforcePerSubjectLimit(subj, ss) + ms.enforcePerSubjectLimit(bytesToString(subj), ss) } - } + return true + }) } ms.mu.Unlock() @@ -113,7 +115,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int var ss *SimpleState var asl bool if len(subj) > 0 { - if ss = ms.fss[subj]; ss != nil { + var ok bool + if ss, ok = ms.fss.Find(stringToBytes(subj)); ok { asl = ms.maxp > 0 && ss.Msgs >= uint64(ms.maxp) } } @@ -191,7 +194,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int ms.enforcePerSubjectLimit(subj, ss) } } else { - ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} + ms.fss.Insert([]byte(subj), SimpleState{Msgs: 1, First: seq, Last: seq}) } } @@ -363,14 +366,17 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje return ss } - isAll := filter == _EMPTY_ || filter == fwcs + if filter == _EMPTY_ { + filter = fwcs + } + isAll := filter == fwcs // First check if we can optimize this part. // This means we want all and the starting sequence was before this block. if isAll && sseq <= ms.state.FirstSeq { total := ms.state.Msgs if lastPerSubject { - total = uint64(len(ms.fss)) + total = uint64(ms.fss.Size()) } return SimpleState{ Msgs: total, @@ -415,21 +421,20 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje var havePartial bool // We will track start and end sequences as we go. - for subj, fss := range ms.fss { - if isMatch(subj) { - if fss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, fss.First, fss) - } - if sseq <= fss.First { - update(fss) - } else if sseq <= fss.Last { - // We matched but its a partial. - havePartial = true - // Don't break here, we will update to keep tracking last. - update(fss) - } + ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) { + subjs := bytesToString(subj) + if fss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subjs, fss.First, fss) } - } + if sseq <= fss.First { + update(fss) + } else if sseq <= fss.Last { + // We matched but its a partial. + havePartial = true + // Don't break here, we will update to keep tracking last. + update(fss) + } + }) // If we did not encounter any partials we can return here. if !havePartial { @@ -476,7 +481,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje for seq := ms.state.FirstSeq; seq < first; seq++ { if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) { if lastPerSubject { - tss = ms.fss[sm.subj] + tss, _ = ms.fss.Find(stringToBytes(sm.subj)) } // If we are last per subject, make sure to only adjust if all messages are before our first. if tss == nil || tss.Last < first { @@ -515,26 +520,29 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { ms.mu.RLock() defer ms.mu.RUnlock() - if len(ms.fss) == 0 { + if ms.fss.Size() == 0 { return nil } - fss := make(map[string]SimpleState) - for subj, ss := range ms.fss { - if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) { - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) - } - oss := fss[subj] - if oss.First == 0 { // New - fss[subj] = *ss - } else { - // Merge here. - oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs - fss[subj] = oss - } - } + if subject == _EMPTY_ { + subject = fwcs } + + fss := make(map[string]SimpleState) + ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) { + subjs := string(subj) + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subjs, ss.First, ss) + } + oss := fss[subjs] + if oss.First == 0 { // New + fss[subjs] = *ss + } else { + // Merge here. + oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs + fss[subjs] = oss + } + }) return fss } @@ -543,7 +551,7 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 { ms.mu.RLock() defer ms.mu.RUnlock() - if len(ms.fss) == 0 { + if ms.fss.Size() == 0 { return nil } @@ -553,15 +561,16 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 { isAll := filterSubject == _EMPTY_ || filterSubject == fwcs fst := make(map[string]uint64) - for subj, ss := range ms.fss { + ms.fss.Match(stringToBytes(filterSubject), func(subj []byte, ss *SimpleState) { + subjs := string(subj) if isAll { - fst[subj] = ss.Msgs + fst[subjs] = ss.Msgs } else { - if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) { - fst[subj] = ss.Msgs + if tts := tokenizeSubjectIntoSlice(tsa[:0], subjs); isSubsetMatchTokenized(tts, fts) { + fst[subjs] = ss.Msgs } } - } + }) return fst } @@ -755,7 +764,7 @@ func (ms *memStore) purge(fseq uint64) (uint64, error) { ms.state.Bytes = 0 ms.state.Msgs = 0 ms.msgs = make(map[uint64]*StoreMsg) - ms.fss = make(map[string]*SimpleState) + ms.fss = stree.NewSubjectTree[SimpleState]() ms.mu.Unlock() if cb != nil { @@ -846,7 +855,7 @@ func (ms *memStore) reset() error { ms.state.Bytes = 0 // Reset msgs and fss. ms.msgs = make(map[uint64]*StoreMsg) - ms.fss = make(map[string]*SimpleState) + ms.fss = stree.NewSubjectTree[SimpleState]() ms.mu.Unlock() @@ -950,7 +959,8 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error if subject == _EMPTY_ || subject == fwcs { sm, ok = ms.msgs[ms.state.LastSeq] } else if subjectIsLiteral(subject) { - if ss := ms.fss[subject]; ss != nil && ss.Msgs > 0 { + var ss *SimpleState + if ss, ok = ms.fss.Find(stringToBytes(subject)); ok && ss.Msgs > 0 { sm, ok = ms.msgs[ss.Last] } } else if ss := ms.filteredStateLocked(1, subject, true); ss.Msgs > 0 { @@ -982,12 +992,15 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store return nil, ms.state.LastSeq, ErrStoreEOF } - isAll := filter == _EMPTY_ || filter == fwcs + if filter == _EMPTY_ { + filter = fwcs + } + isAll := filter == fwcs // Skip scan of ms.fss is number of messages in the block are less than // 1/2 the number of subjects in ms.fss. Or we have a wc and lots of fss entries. const linearScanMaxFSS = 256 - doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < len(ms.fss) || (wc && len(ms.fss) > linearScanMaxFSS) + doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < ms.fss.Size() || (wc && ms.fss.Size() > linearScanMaxFSS) // Initial setup. fseq, lseq := start, ms.state.LastSeq @@ -996,16 +1009,14 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store subs := []string{filter} if wc || isAll { subs = subs[:0] - for fsubj := range ms.fss { - if isAll || subjectIsSubsetMatch(fsubj, filter) { - subs = append(subs, fsubj) - } - } + ms.fss.Match(stringToBytes(filter), func(subj []byte, val *SimpleState) { + subs = append(subs, string(subj)) + }) } fseq, lseq = ms.state.LastSeq, uint64(0) for _, subj := range subs { - ss := ms.fss[subj] - if ss == nil { + ss, ok := ms.fss.Find(stringToBytes(subj)) + if !ok { continue } if ss.firstNeedsUpdate { @@ -1093,12 +1104,12 @@ func (ms *memStore) updateFirstSeq(seq uint64) { // Remove a seq from the fss and select new first. // Lock should be held. func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { - ss := ms.fss[subj] - if ss == nil { + ss, ok := ms.fss.Find(stringToBytes(subj)) + if !ok { return } if ss.Msgs == 1 { - delete(ms.fss, subj) + ms.fss.Delete(stringToBytes(subj)) return } ss.Msgs-- @@ -1198,7 +1209,7 @@ func (ms *memStore) FastState(state *StreamState) { } } state.Consumers = ms.consumers - state.NumSubjects = len(ms.fss) + state.NumSubjects = ms.fss.Size() ms.mu.RUnlock() } @@ -1208,7 +1219,7 @@ func (ms *memStore) State() StreamState { state := ms.state state.Consumers = ms.consumers - state.NumSubjects = len(ms.fss) + state.NumSubjects = ms.fss.Size() state.Deleted = nil // Calculate interior delete details. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go b/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go index e82ddb293e..793d57d273 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go @@ -1439,26 +1439,31 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc } } + // Opportunistically delete the old (legacy) consumer, from v2.10.10 and + // before. Ignore any errors that might arise. + rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id + jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName) + // Using ephemeral consumer is too risky because if this server were to be // disconnected from the rest for few seconds, then the leader would remove // the consumer, so even after a reconnect, we would no longer receive - // retained messages. Delete any existing durable that we have for that - // and recreate here. - // The name for the durable is $MQTT_rmsgs_ (which is jsa.id) - rmDurName := mqttRetainedMsgsStreamName + "_" + jsa.id - // If error other than "not found" then fail, otherwise proceed with creating - // the durable consumer. - if _, err := jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmDurName); isErrorOtherThan(err, JSConsumerNotFoundErr) { - return nil, err - } + // retained messages. + // + // So we use a durable consumer, and create a new one each time we start. + // The old one should expire and get deleted due to inactivity. The name for + // the durable is $MQTT_rmsgs_{uuid}_{server-name}, the server name is just + // for readability. + rmDurName := mqttRetainedMsgsStreamName + "_" + nuid.Next() + "_" + s.String() + ccfg := &CreateConsumerRequest{ Stream: mqttRetainedMsgsStreamName, Config: ConsumerConfig{ - Durable: rmDurName, - FilterSubject: mqttRetainedMsgsStreamSubject + ">", - DeliverSubject: rmsubj, - ReplayPolicy: ReplayInstant, - AckPolicy: AckNone, + Durable: rmDurName, + FilterSubject: mqttRetainedMsgsStreamSubject + ">", + DeliverSubject: rmsubj, + ReplayPolicy: ReplayInstant, + AckPolicy: AckNone, + InactiveThreshold: 5 * time.Minute, }, } if _, err := jsa.createConsumer(ccfg); err != nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/opts.go b/vendor/github.com/nats-io/nats-server/v2/server/opts.go index 8925ef6df1..721d6be843 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/opts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/opts.go @@ -80,6 +80,8 @@ type ClusterOpts struct { PoolSize int `json:"-"` PinnedAccounts []string `json:"-"` Compression CompressionOpts `json:"-"` + PingInterval time.Duration `json:"-"` + MaxPingsOut int `json:"-"` // Not exported (used in tests) resolver netResolver @@ -1755,6 +1757,13 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err *errors = append(*errors, err) continue } + case "ping_interval": + opts.Cluster.PingInterval = parseDuration("ping_interval", tk, mv, errors, warnings) + if opts.Cluster.PingInterval > routeMaxPingInterval { + *warnings = append(*warnings, &configErr{tk, fmt.Sprintf("Cluster 'ping_interval' will reset to %v which is the max for routes", routeMaxPingInterval)}) + } + case "ping_max": + opts.Cluster.MaxPingsOut = int(mv.(int64)) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/vendor/github.com/nats-io/nats-server/v2/server/raft.go index 6990812c47..f041fb0898 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -154,11 +154,12 @@ type raft struct { llqrt time.Time // Last quorum lost time lsut time.Time // Last scale-up time - term uint64 // The current vote term - pterm uint64 // Previous term from the last snapshot - pindex uint64 // Previous index from the last snapshot - commit uint64 // Sequence number of the most recent commit - applied uint64 // Sequence number of the most recently applied commit + term uint64 // The current vote term + pterm uint64 // Previous term from the last snapshot + pindex uint64 // Previous index from the last snapshot + commit uint64 // Sequence number of the most recent commit + applied uint64 // Sequence number of the most recently applied commit + hcbehind bool // Were we falling behind at the last health check? (see: isCurrent) leader string // The ID of the leader vote string // Our current vote state @@ -1052,7 +1053,11 @@ func (n *raft) InstallSnapshot(data []byte) error { sn := fmt.Sprintf(snapFileT, snap.lastTerm, snap.lastIndex) sfile := filepath.Join(snapDir, sn) - if err := os.WriteFile(sfile, n.encodeSnapshot(snap), defaultFilePerms); err != nil { + <-dios + err := os.WriteFile(sfile, n.encodeSnapshot(snap), defaultFilePerms) + dios <- struct{}{} + + if err != nil { n.Unlock() // We could set write err here, but if this is a temporary situation, too many open files etc. // we want to retry and snapshots are not fatal. @@ -1187,7 +1192,11 @@ func (n *raft) loadLastSnapshot() (*snapshot, error) { if n.snapfile == _EMPTY_ { return nil, errNoSnapAvailable } + + <-dios buf, err := os.ReadFile(n.snapfile) + dios <- struct{}{} + if err != nil { n.warn("Error reading snapshot: %v", err) os.Remove(n.snapfile) @@ -1269,8 +1278,18 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { return false } + // If we were previously logging about falling behind, also log when the problem + // was cleared. + clearBehindState := func() { + if n.hcbehind { + n.warn("Health check OK, no longer falling behind") + n.hcbehind = false + } + } + // Make sure we are the leader or we know we have heard from the leader recently. if n.State() == Leader { + clearBehindState() return true } @@ -1294,6 +1313,7 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { if n.commit == n.applied { // At this point if we are current, we can return saying so. + clearBehindState() return true } else if !includeForwardProgress { // Otherwise, if we aren't allowed to include forward progress @@ -1311,11 +1331,13 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { n.Lock() if n.commit-n.applied < startDelta { // The gap is getting smaller, so we're making forward progress. + clearBehindState() return true } } } + n.hcbehind = true n.warn("Falling behind in health check, commit %d != applied %d", n.commit, n.applied) return false } @@ -3678,14 +3700,22 @@ func writePeerState(sd string, ps *peerState) error { if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) { return err } - if err := os.WriteFile(psf, encodePeerState(ps), defaultFilePerms); err != nil { + + <-dios + err := os.WriteFile(psf, encodePeerState(ps), defaultFilePerms) + dios <- struct{}{} + + if err != nil { return err } return nil } func readPeerState(sd string) (ps *peerState, err error) { + <-dios buf, err := os.ReadFile(filepath.Join(sd, peerStateFile)) + dios <- struct{}{} + if err != nil { return nil, err } @@ -3698,7 +3728,10 @@ const termVoteLen = idLen + 8 // readTermVote will read the largest term and who we voted from to stable storage. // Lock should be held. func (n *raft) readTermVote() (term uint64, voted string, err error) { + <-dios buf, err := os.ReadFile(filepath.Join(n.sd, termVoteFile)) + dios <- struct{}{} + if err != nil { return 0, noVote, err } @@ -3920,6 +3953,10 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { n.resetElect(randCampaignTimeout()) } } + + // Term might have changed, make sure response has the most current + vresp.term = n.term + n.Unlock() n.sendReply(vr.reply, vresp.encode()) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/route.go b/vendor/github.com/nats-io/nats-server/v2/server/route.go index e73f1bdf72..f8a8623d6a 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/route.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/route.go @@ -1780,7 +1780,15 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie // the connection as stale based on the ping interval and max out values, // but without actually sending pings. if compressionConfigured { - c.ping.tmr = time.AfterFunc(opts.PingInterval*time.Duration(opts.MaxPingsOut+1), func() { + pingInterval := opts.PingInterval + pingMax := opts.MaxPingsOut + if opts.Cluster.PingInterval > 0 { + pingInterval = opts.Cluster.PingInterval + } + if opts.Cluster.MaxPingsOut > 0 { + pingMax = opts.MaxPingsOut + } + c.ping.tmr = time.AfterFunc(pingInterval*time.Duration(pingMax+1), func() { c.mu.Lock() c.Debugf("Stale Client Connection - Closing") c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection"))) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index f032f285b2..b2bef9a862 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -637,9 +637,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt mset.store.FastState(&state) // Possible race with consumer.setLeader during recovery. - mset.mu.Lock() + mset.mu.RLock() mset.lseq = state.LastSeq - mset.mu.Unlock() + mset.mu.RUnlock() // If no msgs (new stream), set dedupe state loaded to true. if state.Msgs == 0 { @@ -5763,7 +5763,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error return mset, nil } -// This is to check for dangling messages on interest retention streams. +// This is to check for dangling messages on interest retention streams. Only called on account enable. // Issue https://github.com/nats-io/nats-server/issues/3612 func (mset *stream) checkForOrphanMsgs() { mset.mu.RLock() @@ -5771,9 +5771,23 @@ func (mset *stream) checkForOrphanMsgs() { for _, o := range mset.consumers { consumers = append(consumers, o) } + accName, stream := mset.acc.Name, mset.cfg.Name + + var ss StreamState + mset.store.FastState(&ss) mset.mu.RUnlock() + for _, o := range consumers { - o.checkStateForInterestStream() + if err := o.checkStateForInterestStream(); err == errAckFloorHigherThanLastSeq { + o.mu.RLock() + s, consumer := o.srv, o.name + state, _ := o.store.State() + asflr := state.AckFloor.Stream + o.mu.RUnlock() + // Warn about stream state vs our ack floor. + s.RateLimitWarnf("Detected consumer '%s > %s > %s' ack floor %d is ahead of stream's last sequence %d", + accName, stream, consumer, asflr, ss.LastSeq) + } } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/dump.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/dump.go new file mode 100644 index 0000000000..79b6fd99f8 --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/dump.go @@ -0,0 +1,68 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +import ( + "fmt" + "io" + "strings" +) + +// For dumping out a text representation of a tree. +func (t *SubjectTree[T]) Dump(w io.Writer) { + t.dump(w, t.root, 0) + fmt.Fprintln(w) +} + +// Will dump out a node. +func (t *SubjectTree[T]) dump(w io.Writer, n node, depth int) { + if n == nil { + fmt.Fprintf(w, "EMPTY\n") + return + } + if n.isLeaf() { + leaf := n.(*leaf[T]) + fmt.Fprintf(w, "%s LEAF: Suffix: %q Value: %+v\n", dumpPre(depth), leaf.suffix, leaf.value) + n = nil + } else { + // We are a node type here, grab meta portion. + bn := n.base() + fmt.Fprintf(w, "%s %s Prefix: %q\n", dumpPre(depth), n.kind(), bn.prefix[:bn.prefixLen]) + depth++ + n.iter(func(n node) bool { + t.dump(w, n, depth) + return true + }) + } +} + +// For individual node/leaf dumps. +func (n *leaf[T]) kind() string { return "LEAF" } +func (n *node4) kind() string { return "NODE4" } +func (n *node16) kind() string { return "NODE16" } +func (n *node256) kind() string { return "NODE256" } + +// Calculates the indendation, etc. +func dumpPre(depth int) string { + if depth == 0 { + return "-- " + } else { + var b strings.Builder + for i := 0; i < depth; i++ { + b.WriteString(" ") + } + b.WriteString("|__ ") + return b.String() + } +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/leaf.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/leaf.go new file mode 100644 index 0000000000..839450f2e4 --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/leaf.go @@ -0,0 +1,50 @@ +// Copyright 2023-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +import ( + "bytes" +) + +// Leaf node +type leaf[T any] struct { + // This could be the whole subject, but most likely just the suffix portion. + // We will only store the suffix here and assume all prior prefix paths have + // been checked once we arrive at this leafnode. + suffix []byte + value T +} + +func newLeaf[T any](suffix []byte, value T) *leaf[T] { + return &leaf[T]{copyBytes(suffix), value} +} + +func (n *leaf[T]) isLeaf() bool { return true } +func (n *leaf[T]) base() *meta { return nil } +func (n *leaf[T]) match(subject []byte) bool { return bytes.Equal(subject, n.suffix) } +func (n *leaf[T]) setSuffix(suffix []byte) { n.suffix = copyBytes(suffix) } +func (n *leaf[T]) isFull() bool { return true } +func (n *leaf[T]) matchParts(parts [][]byte) ([][]byte, bool) { return matchParts(parts, n.suffix) } +func (n *leaf[T]) iter(f func(node) bool) {} +func (n *leaf[T]) children() []node { return nil } +func (n *leaf[T]) numChildren() uint16 { return 0 } +func (n *leaf[T]) path() []byte { return n.suffix } + +// Not applicable to leafs and should not be called, so panic if we do. +func (n *leaf[T]) setPrefix(pre []byte) { panic("setPrefix called on leaf") } +func (n *leaf[T]) addChild(_ byte, _ node) { panic("addChild called on leaf") } +func (n *leaf[T]) findChild(_ byte) *node { panic("findChild called on leaf") } +func (n *leaf[T]) grow() node { panic("grow called on leaf") } +func (n *leaf[T]) deleteChild(_ byte) { panic("deleteChild called on leaf") } +func (n *leaf[T]) shrink() node { panic("shrink called on leaf") } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/node.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/node.go new file mode 100644 index 0000000000..0afecfab9d --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/node.go @@ -0,0 +1,45 @@ +// Copyright 2023-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +// Internal node interface. +type node interface { + isLeaf() bool + base() *meta + setPrefix(pre []byte) + addChild(c byte, n node) + findChild(c byte) *node + deleteChild(c byte) + isFull() bool + grow() node + shrink() node + matchParts(parts [][]byte) ([][]byte, bool) + kind() string + iter(f func(node) bool) + children() []node + numChildren() uint16 + path() []byte +} + +// Maximum prefix len +// We expect the most savings to come from long shared prefixes. +// This also makes the meta base layer exactly 64 bytes, a normal L1 cache line. +const maxPrefixLen = 60 + +// 64 bytes total - an L1 cache line. +type meta struct { + prefix [maxPrefixLen]byte + prefixLen uint16 + size uint16 +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/node16.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/node16.go new file mode 100644 index 0000000000..b61f286dcd --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/node16.go @@ -0,0 +1,121 @@ +// Copyright 2023-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +// Node with 16 children +type node16 struct { + meta + child [16]node + key [16]byte +} + +func newNode16(prefix []byte) *node16 { + nn := &node16{} + nn.setPrefix(prefix) + return nn +} + +func (n *node16) isLeaf() bool { return false } +func (n *node16) base() *meta { return &n.meta } + +func (n *node16) setPrefix(pre []byte) { + n.prefixLen = uint16(min(len(pre), maxPrefixLen)) + for i := uint16(0); i < n.prefixLen; i++ { + n.prefix[i] = pre[i] + } +} + +// Currently we do not keep node16 sorted or use bitfields for traversal so just add to the end. +// TODO(dlc) - We should revisit here with more detailed benchmarks. +func (n *node16) addChild(c byte, nn node) { + if n.size >= 16 { + panic("node16 full!") + } + n.key[n.size] = c + n.child[n.size] = nn + n.size++ +} + +func (n *node16) numChildren() uint16 { return n.size } +func (n *node16) path() []byte { return n.prefix[:n.prefixLen] } + +func (n *node16) findChild(c byte) *node { + for i := uint16(0); i < n.size; i++ { + if n.key[i] == c { + return &n.child[i] + } + } + return nil +} + +func (n *node16) isFull() bool { return n.size >= 16 } + +func (n *node16) grow() node { + nn := newNode256(n.prefix[:n.prefixLen]) + for i := 0; i < 16; i++ { + nn.addChild(n.key[i], n.child[i]) + } + return nn +} + +// Deletes a child from the node. +func (n *node16) deleteChild(c byte) { + for i, last := uint16(0), n.size-1; i < n.size; i++ { + if n.key[i] == c { + // Unsorted so just swap in last one here, else nil if last. + if i < last { + n.key[i] = n.key[last] + n.child[i] = n.child[last] + n.key[last] = 0 + n.child[last] = nil + } else { + n.key[i] = 0 + n.child[i] = nil + } + n.size-- + return + } + } +} + +// Shrink if needed and return new node, otherwise return nil. +func (n *node16) shrink() node { + if n.size > 4 { + return nil + } + nn := newNode4(nil) + for i := uint16(0); i < n.size; i++ { + nn.addChild(n.key[i], n.child[i]) + } + return nn +} + +// Will match parts against our prefix.no +func (n *node16) matchParts(parts [][]byte) ([][]byte, bool) { + return matchParts(parts, n.prefix[:n.prefixLen]) +} + +// Iterate over all children calling func f. +func (n *node16) iter(f func(node) bool) { + for i := uint16(0); i < n.size; i++ { + if !f(n.child[i]) { + return + } + } +} + +// Return our children as a slice. +func (n *node16) children() []node { + return n.child[:n.size] +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/node256.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/node256.go new file mode 100644 index 0000000000..c7bcd33620 --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/node256.go @@ -0,0 +1,97 @@ +// Copyright 2023-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +// Node with 256 children +type node256 struct { + meta + child [256]node +} + +func newNode256(prefix []byte) *node256 { + nn := &node256{} + nn.setPrefix(prefix) + return nn +} + +func (n *node256) isLeaf() bool { return false } +func (n *node256) base() *meta { return &n.meta } + +func (n *node256) setPrefix(pre []byte) { + n.prefixLen = uint16(min(len(pre), maxPrefixLen)) + for i := uint16(0); i < n.prefixLen; i++ { + n.prefix[i] = pre[i] + } +} + +func (n *node256) addChild(c byte, nn node) { + n.child[c] = nn + n.size++ +} + +func (n *node256) numChildren() uint16 { return n.size } +func (n *node256) path() []byte { return n.prefix[:n.prefixLen] } + +func (n *node256) findChild(c byte) *node { + if n.child[c] != nil { + return &n.child[c] + } + return nil +} + +func (n *node256) isFull() bool { return false } +func (n *node256) grow() node { panic("grow can not be called on node256") } + +// Deletes a child from the node. +func (n *node256) deleteChild(c byte) { + if n.child[c] != nil { + n.child[c] = nil + n.size-- + } +} + +// Shrink if needed and return new node, otherwise return nil. +func (n *node256) shrink() node { + if n.size > 16 { + return nil + } + nn := newNode16(nil) + for c, child := range n.child { + if child != nil { + nn.addChild(byte(c), n.child[c]) + } + } + return nn +} + +// Will match parts against our prefix. +func (n *node256) matchParts(parts [][]byte) ([][]byte, bool) { + return matchParts(parts, n.prefix[:n.prefixLen]) +} + +// Iterate over all children calling func f. +func (n *node256) iter(f func(node) bool) { + for i := 0; i < 256; i++ { + if n.child[i] != nil { + if !f(n.child[i]) { + return + } + } + } +} + +// Return our children as a slice. +func (n *node256) children() []node { + return n.child[:256] +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/node4.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/node4.go new file mode 100644 index 0000000000..195299e266 --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/node4.go @@ -0,0 +1,116 @@ +// Copyright 2023-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +// Node with 4 children +type node4 struct { + meta + child [4]node + key [4]byte +} + +func newNode4(prefix []byte) *node4 { + nn := &node4{} + nn.setPrefix(prefix) + return nn +} + +func (n *node4) isLeaf() bool { return false } +func (n *node4) base() *meta { return &n.meta } + +func (n *node4) setPrefix(pre []byte) { + n.prefixLen = uint16(min(len(pre), maxPrefixLen)) + for i := uint16(0); i < n.prefixLen; i++ { + n.prefix[i] = pre[i] + } +} + +// Currently we do not need to keep sorted for traversal so just add to the end. +func (n *node4) addChild(c byte, nn node) { + if n.size >= 4 { + panic("node4 full!") + } + n.key[n.size] = c + n.child[n.size] = nn + n.size++ +} + +func (n *node4) numChildren() uint16 { return n.size } +func (n *node4) path() []byte { return n.prefix[:n.prefixLen] } + +func (n *node4) findChild(c byte) *node { + for i := uint16(0); i < n.size; i++ { + if n.key[i] == c { + return &n.child[i] + } + } + return nil +} + +func (n *node4) isFull() bool { return n.size >= 4 } + +func (n *node4) grow() node { + nn := newNode16(n.prefix[:n.prefixLen]) + for i := 0; i < 4; i++ { + nn.addChild(n.key[i], n.child[i]) + } + return nn +} + +// Deletes a child from the node. +func (n *node4) deleteChild(c byte) { + for i, last := uint16(0), n.size-1; i < n.size; i++ { + if n.key[i] == c { + // Unsorted so just swap in last one here, else nil if last. + if i < last { + n.key[i] = n.key[last] + n.child[i] = n.child[last] + n.key[last] = 0 + n.child[last] = nil + } else { + n.key[i] = 0 + n.child[i] = nil + } + n.size-- + return + } + } +} + +// Shrink if needed and return new node, otherwise return nil. +func (n *node4) shrink() node { + if n.size == 1 { + return n.child[0] + } + return nil +} + +// Will match parts against our prefix. +func (n *node4) matchParts(parts [][]byte) ([][]byte, bool) { + return matchParts(parts, n.prefix[:n.prefixLen]) +} + +// Iterate over all children calling func f. +func (n *node4) iter(f func(node) bool) { + for i := uint16(0); i < n.size; i++ { + if !f(n.child[i]) { + return + } + } +} + +// Return our children as a slice. +func (n *node4) children() []node { + return n.child[:n.size] +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/parts.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/parts.go new file mode 100644 index 0000000000..71783ee11d --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/parts.go @@ -0,0 +1,134 @@ +// Copyright 2023-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +import ( + "bytes" +) + +// genParts will break a filter subject up into parts. +// We need to break this up into chunks based on wildcards, either pwc '*' or fwc '>'. +// We do not care about other tokens per se, just parts that are separated by wildcards with an optional end fwc. +func genParts(filter []byte, parts [][]byte) [][]byte { + var start int + for i, e := 0, len(filter)-1; i < len(filter); i++ { + if filter[i] == tsep { + // See if next token is pwc. Either internal or end pwc. + if i < e && filter[i+1] == pwc && (i+2 <= e && filter[i+2] == tsep || i+1 == e) { + if i > start { + parts = append(parts, filter[start:i+1]) + } + parts = append(parts, filter[i+1:i+2]) + i++ // Skip pwc + if i+2 <= e { + i++ // Skip next tsep from next part too. + } + start = i + 1 + } else if i < e && filter[i+1] == fwc && i+1 == e { + // We have a fwc + if i > start { + parts = append(parts, filter[start:i+1]) + } + parts = append(parts, filter[i+1:i+2]) + i++ // Skip fwc + start = i + 1 + } + } else if filter[i] == pwc || filter[i] == fwc { + // We start with a pwc or fwc. + parts = append(parts, filter[i:i+1]) + if i+1 <= e { + i++ // Skip next tsep from next part too. + } + start = i + 1 + } + } + if start < len(filter) { + // Check to see if we need to eat a leading tsep. + if filter[start] == tsep { + start++ + } + parts = append(parts, filter[start:]) + } + return parts +} + +// Match our parts against a fragment, which could be prefix for nodes or a suffix for leafs. +func matchParts(parts [][]byte, frag []byte) ([][]byte, bool) { + if len(frag) == 0 { + return parts, true + } + + var si int + lpi := len(parts) - 1 + lf := len(frag) + + for i, part := range parts { + if si >= lf { + return parts[i:], true + } + lp := len(part) + // Check for pwc or fwc place holders. + if lp == 1 { + if part[0] == pwc { + index := bytes.IndexByte(frag[si:], tsep) + // We are trying to match pwc and did not find our tsep. + // Will need to move to next node from caller. + if index < 0 { + if i == lpi { + return nil, true + } + return parts[i:], true + } + si += index + 1 + continue + } else if part[0] == fwc { + // If we are here we should be good. + return nil, true + } + } + end := min(si+lp, len(frag)) + // If part is bigger then the fragment, adjust to a portion on the part. + partialPart := lp > end + if partialPart { + // Frag is smaller then part itself. + part = part[:end] + } + if !bytes.Equal(part, frag[si:end]) { + return parts, false + } + // If we still have a portion of the fragment left, update and continue. + if end < lf { + si = end + continue + } + // If we matched a partial, do not move past current part + // but update the part to what was consumed. This allows upper layers to continue. + if end < lp { + if end >= lf { + parts = append([][]byte{}, parts...) // Create a copy before modifying. + parts[i] = parts[i][lf:] + } else { + i++ + } + return parts[i:], true + } + if i == lpi { + return nil, true + } + // If we are here we are not the last part which means we have a wildcard + // gap, so we need to match anything up to next tsep. + si += len(part) + } + return parts, false +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go new file mode 100644 index 0000000000..452f5f35e2 --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go @@ -0,0 +1,380 @@ +// Copyright 2023-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +import ( + "bytes" + "sort" +) + +// SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects. +// Will use dynamic nodes, path compression and lazy expansion. +// The reason this exists is to not only save some memory in our filestore but to greatly optimize matching +// a wildcard subject to certain members, e.g. consumer NumPending calculations. +type SubjectTree[T any] struct { + root node + size int +} + +// NewSubjectTree creates a new SubjectTree with values T. +func NewSubjectTree[T any]() *SubjectTree[T] { + return &SubjectTree[T]{} +} + +// Size returns the number of elements stored. +func (t *SubjectTree[T]) Size() int { + if t == nil { + return 0 + } + return t.size +} + +// Will empty out the tree, or if tree is nil create a new one. +func (t *SubjectTree[T]) Empty() *SubjectTree[T] { + if t == nil { + return NewSubjectTree[T]() + } + t.root, t.size = nil, 0 + return t +} + +// Insert a value into the tree. Will return if the value was updated and if so the old value. +func (t *SubjectTree[T]) Insert(subject []byte, value T) (*T, bool) { + old, updated := t.insert(&t.root, subject, value, 0) + if !updated { + t.size++ + } + return old, updated +} + +// Find will find the value and return it or false if it was not found. +func (t *SubjectTree[T]) Find(subject []byte) (*T, bool) { + var si uint16 + for n := t.root; n != nil; { + if n.isLeaf() { + if ln := n.(*leaf[T]); ln.match(subject[si:]) { + return &ln.value, true + } + return nil, false + } + // We are a node type here, grab meta portion. + if bn := n.base(); bn.prefixLen > 0 { + end := min(int(si+bn.prefixLen), len(subject)) + if !bytes.Equal(subject[si:end], bn.prefix[:bn.prefixLen]) { + return nil, false + } + // Increment our subject index. + si += bn.prefixLen + } + if an := n.findChild(pivot(subject, si)); an != nil { + n = *an + } else { + return nil, false + } + } + return nil, false +} + +// Delete will delete the item and return its value, or not found if it did not exist. +func (t *SubjectTree[T]) Delete(subject []byte) (*T, bool) { + val, deleted := t.delete(&t.root, subject, 0) + if deleted { + t.size-- + } + return val, deleted +} + +// Match will match against a subject that can have wildcards and invoke the callback func for each matched value. +func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) { + if len(filter) == 0 || cb == nil { + return + } + // We need to break this up into chunks based on wildcards, either pwc '*' or fwc '>'. + var raw [16][]byte + parts := genParts(filter, raw[:0]) + var _pre [256]byte + t.match(t.root, parts, _pre[:0], cb) +} + +// Iter will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk. +func (t *SubjectTree[T]) Iter(cb func(subject []byte, val *T) bool) { + if t == nil || t.root == nil { + return + } + var _pre [256]byte + t.iter(t.root, _pre[:0], cb) +} + +// Internal methods + +// Internal call to insert that can be recursive. +func (t *SubjectTree[T]) insert(np *node, subject []byte, value T, si int) (*T, bool) { + n := *np + if n == nil { + *np = newLeaf(subject, value) + return nil, false + } + if n.isLeaf() { + ln := n.(*leaf[T]) + if ln.match(subject[si:]) { + // Replace with new value. + old := ln.value + ln.value = value + return &old, true + } + // Here we need to split this leaf. + cpi := commonPrefixLen(ln.suffix, subject[si:]) + nn := newNode4(subject[si : si+cpi]) + ln.setSuffix(ln.suffix[cpi:]) + si += cpi + // Make sure we have different pivot, normally this will be the case unless we have overflowing prefixes. + if p := pivot(ln.suffix, 0); si < len(subject) && p == subject[si] { + // We need to split the original leaf. Recursively call into insert. + t.insert(np, subject, value, si) + // Now add the update version of *np as a child to the new node4. + nn.addChild(p, *np) + } else { + // Can just add this new leaf as a sibling. + nl := newLeaf(subject[si:], value) + nn.addChild(pivot(nl.suffix, 0), nl) + // Add back original. + nn.addChild(pivot(ln.suffix, 0), ln) + } + *np = nn + return nil, false + } + + // Non-leaf nodes. + bn := n.base() + if bn.prefixLen > 0 { + cpi := commonPrefixLen(bn.prefix[:bn.prefixLen], subject[si:]) + if pli := int(bn.prefixLen); cpi >= pli { + // Move past this node. We look for an existing child node to recurse into. + // If one does not exist we can create a new leaf node. + si += pli + if nn := n.findChild(pivot(subject, si)); nn != nil { + return t.insert(nn, subject, value, si) + } + if n.isFull() { + n = n.grow() + *np = n + } + n.addChild(pivot(subject, si), newLeaf(subject[si:], value)) + return nil, false + } else { + // We did not match the prefix completely here. + // Calculate new prefix for this node. + prefix := subject[si : si+cpi] + si += len(prefix) + // We will insert a new node4 and attach our current node below after adjusting prefix. + nn := newNode4(prefix) + // Shift the prefix for our original node. + n.setPrefix(bn.prefix[cpi:bn.prefixLen]) + nn.addChild(pivot(bn.prefix[:], 0), n) + // Add in our new leaf. + nn.addChild(pivot(subject[si:], 0), newLeaf(subject[si:], value)) + // Update our node reference. + *np = nn + } + } else { + if nn := n.findChild(pivot(subject, si)); nn != nil { + return t.insert(nn, subject, value, si) + } + // No prefix and no matched child, so add in new leafnode as needed. + if n.isFull() { + n = n.grow() + *np = n + } + n.addChild(pivot(subject, si), newLeaf(subject[si:], value)) + } + + return nil, false +} + +// internal function to recursively find the leaf to delete. Will do compaction if the item is found and removed. +func (t *SubjectTree[T]) delete(np *node, subject []byte, si uint16) (*T, bool) { + if t == nil || np == nil || *np == nil || len(subject) == 0 { + return nil, false + } + n := *np + if n.isLeaf() { + ln := n.(*leaf[T]) + if ln.match(subject[si:]) { + *np = nil + return &ln.value, true + } + return nil, false + } + // Not a leaf node. + if bn := n.base(); bn.prefixLen > 0 { + if !bytes.Equal(subject[si:si+bn.prefixLen], bn.prefix[:bn.prefixLen]) { + return nil, false + } + // Increment our subject index. + si += bn.prefixLen + } + p := pivot(subject, si) + nna := n.findChild(p) + if nna == nil { + return nil, false + } + nn := *nna + if nn.isLeaf() { + ln := nn.(*leaf[T]) + if ln.match(subject[si:]) { + n.deleteChild(p) + + if sn := n.shrink(); sn != nil { + bn := n.base() + // Make sure to set cap so we force an append to copy below. + pre := bn.prefix[:bn.prefixLen:bn.prefixLen] + // Need to fix up prefixes/suffixes. + if sn.isLeaf() { + ln := sn.(*leaf[T]) + // Make sure to set cap so we force an append to copy. + ln.suffix = append(pre, ln.suffix...) + } else { + // We are a node here, we need to add in the old prefix. + if len(pre) > 0 { + bsn := sn.base() + sn.setPrefix(append(pre, bsn.prefix[:bsn.prefixLen]...)) + } + } + *np = sn + } + + return &ln.value, true + } + return nil, false + } + return t.delete(nna, subject, si) +} + +// Internal function which can be called recursively to match all leaf nodes to a given filter subject which +// once here has been decomposed to parts. These parts only care about wildcards, both pwc and fwc. +func (t *SubjectTree[T]) match(n node, parts [][]byte, pre []byte, cb func(subject []byte, val *T)) { + // Capture if we are sitting on a terminal fwc. + var hasFWC bool + if lp := len(parts); lp > 0 && parts[lp-1][0] == fwc { + hasFWC = true + } + + for n != nil { + nparts, matched := n.matchParts(parts) + // Check if we did not match. + if !matched { + return + } + // We have matched here. If we are a leaf and have exhausted all parts or he have a FWC fire callback. + if n.isLeaf() { + if len(nparts) == 0 || hasFWC { + ln := n.(*leaf[T]) + cb(append(pre, ln.suffix...), &ln.value) + } + return + } + // We have normal nodes here. + // We need to append our prefix + bn := n.base() + if bn.prefixLen > 0 { + // Note that this append may reallocate, but it doesn't modify "pre" at the "match" callsite. + pre = append(pre, bn.prefix[:bn.prefixLen]...) + } + + // Check our remaining parts. + if len(nparts) == 0 && !hasFWC { + // We are a node with no parts left and we are not looking at a fwc. + // We could have a leafnode with no suffix which would be a match. + // We could also have a terminal pwc. Check for those here. + var hasTermPWC bool + if lp := len(parts); lp > 0 && len(parts[lp-1]) == 1 && parts[lp-1][0] == pwc { + // If we are sitting on a terminal pwc, put the pwc back and continue. + nparts = parts[len(parts)-1:] + hasTermPWC = true + } + for _, cn := range n.children() { + if cn == nil { + continue + } + if cn.isLeaf() { + ln := cn.(*leaf[T]) + if len(ln.suffix) == 0 { + cb(append(pre, ln.suffix...), &ln.value) + } else if hasTermPWC && bytes.IndexByte(ln.suffix, tsep) < 0 { + cb(append(pre, ln.suffix...), &ln.value) + } + } else if hasTermPWC { + // We have terminal pwc so call into match again with the child node. + t.match(cn, nparts, pre, cb) + } + } + // Return regardless. + return + } + // If we are sitting on a terminal fwc, put back and continue. + if hasFWC && len(nparts) == 0 { + nparts = parts[len(parts)-1:] + } + + // Here we are a node type with a partial match. + // Check if the first part is a wildcard. + fp := nparts[0] + p := pivot(fp, 0) + // Check if we have a pwc/fwc part here. This will cause us to iterate. + if len(fp) == 1 && (p == pwc || p == fwc) { + // We need to iterate over all children here for the current node + // to see if we match further down. + for _, cn := range n.children() { + if cn != nil { + t.match(cn, nparts, pre, cb) + } + } + } + // Here we have normal traversal, so find the next child. + nn := n.findChild(p) + if nn == nil { + return + } + n, parts = *nn, nparts + } +} + +// Interal iter function to walk nodes in lexigraphical order. +func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T) bool) bool { + if n.isLeaf() { + ln := n.(*leaf[T]) + return cb(append(pre, ln.suffix...), &ln.value) + } + // We are normal node here. + bn := n.base() + // Note that this append may reallocate, but it doesn't modify "pre" at the "iter" callsite. + pre = append(pre, bn.prefix[:bn.prefixLen]...) + // Collect nodes since unsorted. + var _nodes [256]node + nodes := _nodes[:0] + for _, cn := range n.children() { + if cn != nil { + nodes = append(nodes, cn) + } + } + // Now sort. + sort.SliceStable(nodes, func(i, j int) bool { return bytes.Compare(nodes[i].path(), nodes[j].path()) < 0 }) + // Now walk the nodes in order and call into next iter. + for i := range nodes { + if !t.iter(nodes[i], pre, cb) { + return false + } + } + return true +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/util.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/util.go new file mode 100644 index 0000000000..5ced54bc95 --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/util.go @@ -0,0 +1,61 @@ +// Copyright 2023-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +// For subject matching. +const ( + pwc = '*' + fwc = '>' + tsep = '.' +) + +// Determine index of common prefix. No match at all is 0, etc. +func commonPrefixLen(s1, s2 []byte) int { + limit := min(len(s1), len(s2)) + var i int + for ; i < limit; i++ { + if s1[i] != s2[i] { + break + } + } + return min(i, maxPrefixLen) +} + +// Helper to copy bytes. +func copyBytes(src []byte) []byte { + if len(src) == 0 { + return nil + } + dst := make([]byte, len(src)) + copy(dst, src) + return dst +} + +type position interface{ int | uint16 } + +// Can return 0 if we have all the subject as prefixes. +func pivot[N position](subject []byte, pos N) byte { + if int(pos) >= len(subject) { + return 0 + } + return subject[pos] +} + +// TODO(dlc) - Can be removed with Go 1.21 once server is on Go 1.22. +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d1db0b36c5..a616ffaad3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1241,9 +1241,10 @@ github.com/justinas/alice # github.com/kevinburke/ssh_config v1.2.0 ## explicit github.com/kevinburke/ssh_config -# github.com/klauspost/compress v1.17.4 +# github.com/klauspost/compress v1.17.5 ## explicit; go 1.19 github.com/klauspost/compress/flate +github.com/klauspost/compress/internal/race github.com/klauspost/compress/s2 # github.com/klauspost/cpuid/v2 v2.1.0 ## explicit; go 1.15 @@ -1398,7 +1399,7 @@ github.com/mschoch/smat # github.com/nats-io/jwt/v2 v2.5.3 ## explicit; go 1.18 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.10.9 +# github.com/nats-io/nats-server/v2 v2.10.10 ## explicit; go 1.20 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/fastrand @@ -1409,6 +1410,7 @@ github.com/nats-io/nats-server/v2/server/avl github.com/nats-io/nats-server/v2/server/certidp github.com/nats-io/nats-server/v2/server/certstore github.com/nats-io/nats-server/v2/server/pse +github.com/nats-io/nats-server/v2/server/stree github.com/nats-io/nats-server/v2/server/sysmem # github.com/nats-io/nats.go v1.32.0 ## explicit; go 1.20