From 1ba45133286f52607c40e3d5e2bdc8e41758af44 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 22 May 2025 15:00:24 +0000 Subject: [PATCH] build(deps): bump github.com/nats-io/nats-server/v2 Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.11.3 to 2.11.4. - [Release notes](https://github.com/nats-io/nats-server/releases) - [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml) - [Commits](https://github.com/nats-io/nats-server/compare/v2.11.3...v2.11.4) --- updated-dependencies: - dependency-name: github.com/nats-io/nats-server/v2 dependency-version: 2.11.4 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 4 +- go.sum | 8 +- .../google/go-tpm/legacy/tpm2/constants.go | 1 + .../google/go-tpm/legacy/tpm2/tpm2.go | 36 ++- .../nats-server/v2/server/README-MQTT.md | 2 +- .../nats-io/nats-server/v2/server/ats/ats.go | 83 +++++ .../nats-io/nats-server/v2/server/client.go | 18 +- .../nats-io/nats-server/v2/server/const.go | 2 +- .../nats-io/nats-server/v2/server/consumer.go | 5 + .../nats-server/v2/server/filestore.go | 301 +++++++++--------- .../nats-io/nats-server/v2/server/gateway.go | 64 ++-- .../nats-server/v2/server/jetstream.go | 1 - .../v2/server/jetstream_cluster.go | 13 + .../nats-io/nats-server/v2/server/memstore.go | 10 +- .../nats-io/nats-server/v2/server/store.go | 2 +- .../nats-io/nats-server/v2/server/stream.go | 9 +- .../nats-server/v2/server/stree/stree.go | 34 +- vendor/modules.txt | 5 +- 18 files changed, 399 insertions(+), 199 deletions(-) create mode 100644 vendor/github.com/nats-io/nats-server/v2/server/ats/ats.go diff --git a/go.mod b/go.mod index f76431f37c..496c379f89 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.11.3 + github.com/nats-io/nats-server/v2 v2.11.4 github.com/nats-io/nats.go v1.42.0 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v1.0.6 @@ -215,7 +215,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/gomodule/redigo v1.9.2 // indirect github.com/google/go-querystring v1.1.0 // indirect - github.com/google/go-tpm v0.9.3 // indirect + github.com/google/go-tpm v0.9.5 // indirect github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect github.com/google/renameio/v2 v2.0.0 // indirect github.com/gookit/color v1.5.4 // indirect diff --git a/go.sum b/go.sum index 9b6a2718b1..3a0d05aa63 100644 --- a/go.sum +++ b/go.sum @@ -527,8 +527,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/go-tika v0.3.1 h1:l+jr10hDhZjcgxFRfcQChRLo1bPXQeLFluMyvDhXTTA= github.com/google/go-tika v0.3.1/go.mod h1:DJh5N8qxXIl85QkqmXknd+PeeRkUOTbvwyYf7ieDz6c= -github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc= -github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU= +github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -820,8 +820,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI= github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats-server/v2 v2.11.3 h1:AbGtXxuwjo0gBroLGGr/dE0vf24kTKdRnBq/3z/Fdoc= -github.com/nats-io/nats-server/v2 v2.11.3/go.mod h1:6Z6Fd+JgckqzKig7DYwhgrE7bJ6fypPHnGPND+DqgMY= +github.com/nats-io/nats-server/v2 v2.11.4 h1:oQhvy6He6ER926sGqIKBKuYHH4BGnUQCNb0Y5Qa+M54= +github.com/nats-io/nats-server/v2 v2.11.4/go.mod h1:jFnKKwbNeq6IfLHq+OMnl7vrFRihQ/MkhRbiWfjLdjU= github.com/nats-io/nats.go v1.42.0 h1:ynIMupIOvf/ZWH/b2qda6WGKGNSjwOUutTpWRvAmhaM= github.com/nats-io/nats.go v1.42.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= diff --git a/vendor/github.com/google/go-tpm/legacy/tpm2/constants.go b/vendor/github.com/google/go-tpm/legacy/tpm2/constants.go index 2b0de54444..1357370aa2 100644 --- a/vendor/github.com/google/go-tpm/legacy/tpm2/constants.go +++ b/vendor/github.com/google/go-tpm/legacy/tpm2/constants.go @@ -447,6 +447,7 @@ const ( CmdClear tpmutil.Command = 0x00000126 CmdHierarchyChangeAuth tpmutil.Command = 0x00000129 CmdDefineSpace tpmutil.Command = 0x0000012A + CmdPCRAllocate tpmutil.Command = 0x0000012B CmdCreatePrimary tpmutil.Command = 0x00000131 CmdIncrementNVCounter tpmutil.Command = 0x00000134 CmdWriteNV tpmutil.Command = 0x00000137 diff --git a/vendor/github.com/google/go-tpm/legacy/tpm2/tpm2.go b/vendor/github.com/google/go-tpm/legacy/tpm2/tpm2.go index 18d5a96033..0105c37294 100644 --- a/vendor/github.com/google/go-tpm/legacy/tpm2/tpm2.go +++ b/vendor/github.com/google/go-tpm/legacy/tpm2/tpm2.go @@ -51,6 +51,10 @@ func encodeTPMLPCRSelection(sel ...PCRSelection) ([]byte, error) { return tpmutil.Pack(uint32(0)) } + if len(sel) == 1 && len(sel[0].PCRs) == 0 && sel[0].Hash == 0 { + return tpmutil.Pack(uint32(0)) + } + // PCR selection is a variable-size bitmask, where position of a set bit is // the selected PCR index. // Size of the bitmask in bytes is pre-pended. It should be at least @@ -61,10 +65,6 @@ func encodeTPMLPCRSelection(sel ...PCRSelection) ([]byte, error) { // 00000011 00000000 00000001 00000100 var retBytes []byte for _, s := range sel { - if len(s.PCRs) == 0 { - return tpmutil.Pack(uint32(0)) - } - ts := tpmsPCRSelection{ Hash: s.Hash, Size: sizeOfPCRSelect, @@ -1153,6 +1153,34 @@ func Clear(rw io.ReadWriter, handle tpmutil.Handle, auth AuthCommand) error { return err } +func encodePCRAllocate(handle tpmutil.Handle, auth AuthCommand, pcrSelection []PCRSelection) ([]byte, error) { + ah, err := tpmutil.Pack(handle) + if err != nil { + return nil, err + } + authEncoded, err := encodeAuthArea(auth) + if err != nil { + return nil, err + } + + pcrEncoded, err := encodeTPMLPCRSelection(pcrSelection...) + if err != nil { + return nil, err + } + return concat(ah, authEncoded, pcrEncoded) +} + +// PCRAllocate sets the desired PCR allocation of PCR and algorithms. +// The changes take effect once the TPM is restarted. +func PCRAllocate(rw io.ReadWriter, handle tpmutil.Handle, auth AuthCommand, pcrSelection []PCRSelection) error { + Cmd, err := encodePCRAllocate(handle, auth, pcrSelection) + if err != nil { + return err + } + _, err = runCommand(rw, TagSessions, CmdPCRAllocate, tpmutil.RawBytes(Cmd)) + return err +} + func encodeHierarchyChangeAuth(handle tpmutil.Handle, auth AuthCommand, newAuth string) ([]byte, error) { ah, err := tpmutil.Pack(handle) if err != nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/README-MQTT.md b/vendor/github.com/nats-io/nats-server/v2/server/README-MQTT.md index ddab83a149..6b420a847a 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/README-MQTT.md +++ b/vendor/github.com/nats-io/nats-server/v2/server/README-MQTT.md @@ -5,7 +5,7 @@ Revision 1.1 Authors: Ivan Kozlovic, Lev Brouk NATS Server currently supports most of MQTT 3.1.1. This document describes how -it is implementated. +it is implemented. It is strongly recommended to review the [MQTT v3.1.1 specifications](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/ats/ats.go b/vendor/github.com/nats-io/nats-server/v2/server/ats/ats.go new file mode 100644 index 0000000000..c310ccbd07 --- /dev/null +++ b/vendor/github.com/nats-io/nats-server/v2/server/ats/ats.go @@ -0,0 +1,83 @@ +// Copyright 2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// ats controls the go routines for the access time service. +// This allows more efficient unixnano operations for cache access times. +// We will have one per binary (usually per server). +package ats + +import ( + "sync/atomic" + "time" +) + +// Update every 100ms for gathering access time in unix nano. +const TickInterval = 100 * time.Millisecond + +var ( + // Our unix nano time. + utime atomic.Int64 + // How may registered users do we have, controls lifetime of Go routine. + refs atomic.Int64 + // To signal the shutdown of the Go routine. + done chan struct{} +) + +func init() { + // Initialize our done chan. + done = make(chan struct{}, 1) +} + +// Register usage. This will happen on filestore creation. +func Register() { + if v := refs.Add(1); v == 1 { + // This is the first to register (could also go up and down), + // so spin up Go routine and grab initial time. + utime.Store(time.Now().UnixNano()) + + go func() { + ticker := time.NewTicker(TickInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + utime.Store(time.Now().UnixNano()) + case <-done: + return + } + } + }() + } +} + +// Unregister usage. We will shutdown the go routine if no more registered users. +func Unregister() { + if v := refs.Add(-1); v == 0 { + done <- struct{}{} + } else if v < 0 { + refs.Store(0) + panic("unbalanced unregister for access time state") + } +} + +// Will load the access time from an atomic. +// If no one has registered this will return 0 or stale data. +// It is the responsibility of the user to properly register and unregister. +func AccessTime() int64 { + // Return last updated time. + v := utime.Load() + if v == 0 { + panic("access time service not running") + } + return v +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go index bc59b7259c..006db041b2 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -15,8 +15,10 @@ package server import ( "bytes" + "crypto/sha256" "crypto/tls" "crypto/x509" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -6066,10 +6068,22 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi } if err != nil { + var detail string + var subjs []string + if ve, ok := err.(*tls.CertificateVerificationError); ok { + for _, cert := range ve.UnverifiedCertificates { + fp := sha256.Sum256(cert.Raw) + fph := hex.EncodeToString(fp[:]) + subjs = append(subjs, fmt.Sprintf("%s SHA-256: %s", cert.Subject.String(), fph)) + } + } + if len(subjs) > 0 { + detail = fmt.Sprintf(" (%s)", strings.Join(subjs, "; ")) + } if kind == CLIENT { - c.Errorf("TLS handshake error: %v", err) + c.Errorf("TLS handshake error: %v%s", err, detail) } else { - c.Errorf("TLS %s handshake error: %v", typ, err) + c.Errorf("TLS %s handshake error: %v%s", typ, err, detail) } c.closeConnection(TLSHandshakeError) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index 0ae04ef51b..9753a62911 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -58,7 +58,7 @@ func init() { const ( // VERSION is the current version for the server. - VERSION = "2.11.3" + VERSION = "2.11.4" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index 302e1cba08..d101d54ca3 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -2976,6 +2976,11 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { TimeStamp: time.Now().UTC(), PriorityGroups: priorityGroups, } + // Reset redelivered for MaxDeliver 1. Redeliveries are disabled so must not report it (is confusing otherwise). + // The state does still keep track of these messages. + if o.cfg.MaxDeliver == 1 { + info.NumRedelivered = 0 + } if o.cfg.PauseUntil != nil { p := *o.cfg.PauseUntil if info.Paused = time.Now().Before(p); info.Paused { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index f7b855f814..392e71351d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -43,6 +43,7 @@ import ( "github.com/klauspost/compress/s2" "github.com/minio/highwayhash" + "github.com/nats-io/nats-server/v2/server/ats" "github.com/nats-io/nats-server/v2/server/avl" "github.com/nats-io/nats-server/v2/server/stree" "github.com/nats-io/nats-server/v2/server/thw" @@ -367,7 +368,7 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { return newFileStoreWithCreated(fcfg, cfg, time.Now().UTC(), nil, nil) } -func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time, prf, oldprf keyGen) (*fileStore, error) { +func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time, prf, oldprf keyGen) (fs *fileStore, err error) { if cfg.Name == _EMPTY_ { return nil, fmt.Errorf("name required") } @@ -409,7 +410,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim os.Remove(tmpfile.Name()) dios <- struct{}{} - fs := &fileStore{ + fs = &fileStore{ fcfg: fcfg, psim: stree.NewSubjectTree[psi](), bim: make(map[uint32]*msgBlock), @@ -421,6 +422,16 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim srv: fcfg.srv, } + // Register with access time service. + ats.Register() + + // If we error before completion make sure to cleanup. + defer func() { + if err != nil { + ats.Unregister() + } + }() + // Only create a THW if we're going to allow TTLs. if cfg.AllowMsgTTL { fs.ttls = thw.NewHashWheel() @@ -475,18 +486,20 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim } // Check if our prior state remembers a last sequence past where we can see. - if fs.ld != nil && prior.LastSeq > fs.state.LastSeq { + if prior.LastSeq > fs.state.LastSeq { fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime if fs.state.Msgs == 0 { fs.state.FirstSeq = fs.state.LastSeq + 1 fs.state.FirstTime = time.Time{} } - if _, err := fs.newMsgBlockForWrite(); err == nil { - if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil { + if fs.ld != nil { + if _, err := fs.newMsgBlockForWrite(); err == nil { + if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil { + return nil, err + } + } else { return nil, err } - } else { - return nil, err } } // Since we recovered here, make sure to kick ourselves to write out our stream state. @@ -503,7 +516,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Also make sure we get rid of old idx and fss files on return. // Do this in separate go routine vs inline and at end of processing. defer func() { - go fs.cleanupOldMeta() + if fs != nil { + go fs.cleanupOldMeta() + } }() // Lock while we do enforcements and removals. @@ -2353,7 +2368,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg) var updateLLTS bool defer func() { if updateLLTS { - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() } mb.mu.Unlock() }() @@ -2468,12 +2483,12 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor var updateLLTS bool defer func() { if updateLLTS { - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() } mb.mu.Unlock() }() - fseq, isAll, subs := start, filter == _EMPTY_ || filter == fwcs, []string{filter} + fseq, isAll := start, filter == _EMPTY_ || filter == fwcs var didLoad bool if mb.fssNotLoaded() { @@ -2482,7 +2497,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor didLoad = true } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() if filter == _EMPTY_ { filter = fwcs @@ -2501,18 +2516,15 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor } } // Make sure to start at mb.first.seq if fseq < mb.first.seq - if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq { - fseq = seq - } + fseq = max(fseq, atomic.LoadUint64(&mb.first.seq)) lseq := atomic.LoadUint64(&mb.last.seq) // Optionally build the isMatch for wildcard filters. - _tsa, _fsa := [32]string{}, [32]string{} - tsa, fsa := _tsa[:0], _fsa[:0] var isMatch func(subj string) bool // Decide to build. if wc { - fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) + _tsa, _fsa := [32]string{}, [32]string{} + tsa, fsa := _tsa[:0], tokenizeSubjectIntoSlice(_fsa[:0], filter) isMatch = func(subj string) bool { tsa = tokenizeSubjectIntoSlice(tsa[:0], subj) return isSubsetMatchTokenized(tsa, fsa) @@ -2532,29 +2544,22 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor if !doLinearScan { // If we have a wildcard match against all tracked subjects we know about. - if wc { - subs = subs[:0] - mb.fss.Match(stringToBytes(filter), func(bsubj []byte, _ *SimpleState) { - subs = append(subs, string(bsubj)) - }) - // Check if we matched anything - if len(subs) == 0 { - return nil, didLoad, ErrStoreMsgNotFound - } - } fseq = lseq + 1 - for _, subj := range subs { - ss, _ := mb.fss.Find(stringToBytes(subj)) - if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { - mb.recalculateForSubj(subj, ss) + if bfilter := stringToBytes(filter); wc { + mb.fss.Match(bfilter, func(bsubj []byte, ss *SimpleState) { + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(bytesToString(bsubj), ss) + } + if start <= ss.Last { + fseq = min(fseq, max(start, ss.First)) + } + }) + } else if ss, _ := mb.fss.Find(bfilter); ss != nil { + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(filter, ss) } - if ss == nil || start > ss.Last || ss.First >= fseq { - continue - } - if ss.First < start { - fseq = start - } else { - fseq = ss.First + if start <= ss.Last { + fseq = min(fseq, max(start, ss.First)) } } } @@ -2563,13 +2568,6 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor return nil, didLoad, ErrStoreMsgNotFound } - // If we guess to not do a linear scan, but the above resulted in alot of subs that will - // need to be checked for every scanned message, revert. - // TODO(dlc) - we could memoize the subs across calls. - if !doLinearScan && len(subs) > int(lseq-fseq) { - doLinearScan = true - } - // Need messages loaded from here on out. if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { @@ -2602,18 +2600,10 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor if isAll { return fsm, expireOk, nil } - if doLinearScan { - if wc && isMatch(sm.subj) { - return fsm, expireOk, nil - } else if !wc && fsm.subj == filter { - return fsm, expireOk, nil - } - } else { - for _, subj := range subs { - if fsm.subj == subj { - return fsm, expireOk, nil - } - } + if wc && isMatch(sm.subj) { + return fsm, expireOk, nil + } else if !wc && fsm.subj == filter { + return fsm, expireOk, nil } // If we are here we did not match, so put the llseq back. mb.llseq = llseq @@ -2996,7 +2986,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { shouldExpire = true } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { subj := string(bsubj) if ss.firstNeedsUpdate || ss.lastNeedsUpdate { @@ -3029,7 +3019,13 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { func (fs *fileStore) AllLastSeqs() ([]uint64, error) { fs.mu.RLock() defer fs.mu.RUnlock() + return fs.allLastSeqsLocked() +} +// allLastSeqsLocked will return a sorted list of last sequences for all +// subjects, but won't take the lock to do it, to avoid the issue of compounding +// read locks causing a deadlock with a write lock. +func (fs *fileStore) allLastSeqsLocked() ([]uint64, error) { if fs.state.Msgs == 0 || fs.noTrackSubjects() { return nil, nil } @@ -3100,7 +3096,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i // See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set. if maxSeq == 0 && maxAllowed <= 0 && fs.filterIsAll(filters) { - return fs.AllLastSeqs() + return fs.allLastSeqsLocked() } lastBlkIndex := len(fs.blks) - 1 @@ -3351,7 +3347,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() } mb.mu.Unlock() return total, validThrough @@ -3377,7 +3373,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) shouldExpire = true } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() var t uint64 var havePartial bool @@ -3477,7 +3473,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) shouldExpire = true } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { adjust += ss.Msgs @@ -3517,7 +3513,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() } mb.mu.Unlock() } @@ -3660,7 +3656,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() } mb.mu.Unlock() return total, validThrough @@ -3685,7 +3681,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo shouldExpire = true } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() var t uint64 var havePartial bool @@ -3738,7 +3734,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() } mb.mu.Unlock() total += t @@ -3798,7 +3794,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo shouldExpire = true } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) { adjust += ss.Msgs }) @@ -3837,7 +3833,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() } mb.mu.Unlock() } @@ -3922,7 +3918,7 @@ func (mb *msgBlock) setupWriteCache(buf []byte) { if fi != nil { mb.cache.off = int(fi.Size()) } - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() mb.startCacheExpireTimer() } @@ -3953,7 +3949,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mb.fss = stree.NewSubjectTree[SimpleState]() // Set cache time to creation time to start. - mb.llts, mb.lwts = 0, getAccessTime() + mb.llts, mb.lwts = 0, ats.AccessTime() // Remember our last sequence number. atomic.StoreUint64(&mb.first.seq, fs.state.LastSeq+1) atomic.StoreUint64(&mb.last.seq, fs.state.LastSeq) @@ -4211,7 +4207,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) { return } var needsRecord bool - nowts := getAccessTime() + nowts := ats.AccessTime() mb.mu.Lock() // If we are empty can just do meta. @@ -4403,7 +4399,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { shouldExpire = true } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() bsubj := stringToBytes(subj) if ss, ok := mb.fss.Find(bsubj); ok && ss != nil { @@ -4492,11 +4488,18 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { var numMsgs uint64 // collect all that are not correct. - needAttention := make(map[string]*psi) + needAttention := stree.NewSubjectTree[uint64]() + fblk, lblk := uint32(math.MaxUint32), uint32(0) fs.psim.IterFast(func(subj []byte, psi *psi) bool { numMsgs += psi.total if psi.total > maxMsgsPer { - needAttention[string(subj)] = psi + needAttention.Insert(subj, psi.total) + if psi.fblk < fblk { + fblk = psi.fblk + } + if psi.lblk > lblk { + lblk = psi.lblk + } } return true }) @@ -4517,55 +4520,71 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { // Rebuild fs state too. fs.rebuildStateLocked(nil) // Need to redo blocks that need attention. - needAttention = make(map[string]*psi) + needAttention.Empty() + fblk, lblk = uint32(math.MaxUint32), uint32(0) fs.psim.IterFast(func(subj []byte, psi *psi) bool { if psi.total > maxMsgsPer { - needAttention[string(subj)] = psi + needAttention.Insert(subj, psi.total) + if psi.fblk < fblk { + fblk = psi.fblk + } + if psi.lblk > lblk { + lblk = psi.lblk + } } return true }) } + // If nothing to do then stop. + if fblk == math.MaxUint32 { + return + } + // Collect all the msgBlks we alter. blks := make(map[*msgBlock]struct{}) // For re-use below. var sm StoreMsg - - // Walk all subjects that need attention here. - for subj, info := range needAttention { - total, start, stop := info.total, info.fblk, info.lblk - - for i := start; i <= stop; i++ { - mb := fs.bim[i] - if mb == nil { - continue + var fss *stree.SubjectTree[*SimpleState] + for i := fblk; i <= lblk; i++ { + mb := fs.bim[i] + if mb == nil { + continue + } + mb.mu.Lock() + mb.ensurePerSubjectInfoLoaded() + // It isn't safe to intersect mb.fss directly, because removeMsgViaLimits modifies it + // during the iteration, which can cause us to miss keys. We won't copy the entire + // SimpleState structs though but rather just take pointers for speed. + fss = fss.Empty() + mb.fss.IterFast(func(subject []byte, val *SimpleState) bool { + fss.Insert(subject, val) + return true + }) + mb.mu.Unlock() + stree.LazyIntersect(needAttention, fss, func(subj []byte, total *uint64, ssptr **SimpleState) { + if ssptr == nil || total == nil { + return } - // Grab the ss entry for this subject in case sparse. - mb.mu.Lock() - mb.ensurePerSubjectInfoLoaded() - ss, ok := mb.fss.Find(stringToBytes(subj)) - if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { - mb.recalculateForSubj(subj, ss) + ss := *ssptr + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.mu.Lock() + mb.recalculateForSubj(bytesToString(subj), ss) + mb.mu.Unlock() } - mb.mu.Unlock() - if ss == nil { - continue - } - for seq := ss.First; seq <= ss.Last && total > maxMsgsPer; { - m, _, err := mb.firstMatching(subj, false, seq, &sm) - if err == nil { - seq = m.seq + 1 - if removed, _ := fs.removeMsgViaLimits(m.seq); removed { - total-- - blks[mb] = struct{}{} - } - } else { - // On error just do single increment. - seq++ + for first := ss.First; *total > maxMsgsPer && first <= ss.Last; { + m, _, err := mb.firstMatching(bytesToString(subj), false, first, &sm) + if err != nil { + break + } + first = m.seq + 1 + if removed, _ := fs.removeMsgViaLimits(m.seq); removed { + blks[mb] = struct{}{} + *total-- } } - } + }) } // Expire the cache if we can. @@ -4699,7 +4718,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) // Set cache timestamp for last remove. - mb.lrts = getAccessTime() + mb.lrts = ats.AccessTime() // Global stats if fs.state.Msgs > 0 { @@ -5489,7 +5508,7 @@ func (mb *msgBlock) expireCacheLocked() { } // Grab timestamp to compare. - tns := getAccessTime() + tns := ats.AccessTime() // For the core buffer of messages, we care about reads and writes, but not removes. bufts := mb.llts @@ -5599,7 +5618,7 @@ func (fs *fileStore) expireMsgs() { fs.mu.RLock() maxAge := int64(fs.cfg.MaxAge) - minAge := getAccessTime() - maxAge + minAge := ats.AccessTime() - maxAge rmcb := fs.rmcb sdmcb := fs.sdmcb sdmTTL := int64(fs.cfg.SubjectDeleteMarkerTTL.Seconds()) @@ -5616,7 +5635,7 @@ func (fs *fileStore) expireMsgs() { if len(sm.hdr) > 0 { if ttl, err := getMessageTTL(sm.hdr); err == nil && ttl < 0 { // The message has a negative TTL, therefore it must "never expire". - minAge = getAccessTime() - maxAge + minAge = ats.AccessTime() - maxAge continue } } @@ -5633,7 +5652,7 @@ func (fs *fileStore) expireMsgs() { fs.mu.Unlock() } // Recalculate in case we are expiring a bunch. - minAge = getAccessTime() - maxAge + minAge = ats.AccessTime() - maxAge } } @@ -5882,7 +5901,7 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg return err } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq @@ -6560,7 +6579,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { popFss = true } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() mb.ttls = 0 lbuf := uint32(len(buf)) @@ -6786,7 +6805,7 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) { // Decide what we want to do with the buffer in hand. If we have load interest // we will hold onto the whole thing, otherwise empty the buffer, possibly reusing it. - if ts := getAccessTime(); ts < mb.llts || (ts-mb.llts) <= int64(mb.cexp) { + if ts := ats.AccessTime(); ts < mb.llts || (ts-mb.llts) <= int64(mb.cexp) { mb.cache.wp += lob } else { if cap(mb.cache.buf) <= maxBufReuse { @@ -6949,7 +6968,7 @@ checkCache: return nil } - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() // FIXME(dlc) - We could be smarter here. if buf, _ := mb.bytesPending(); len(buf) > 0 { @@ -7138,7 +7157,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store // If we have a delete map check it. if mb.dmap.Exists(seq) { - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() return nil, errDeletedMsg } @@ -7169,7 +7188,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store } // Update cache activity. - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() li := int(bi) - mb.cache.off if li >= len(mb.cache.buf) { @@ -7430,7 +7449,7 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err return nil, err } // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() var l uint64 // Optimize if subject is not a wildcard. @@ -8332,8 +8351,13 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) { } else { // Make sure to sync changes. smb.needSync = true - // Update fs first seq and time. - atomic.StoreUint64(&smb.first.seq, seq-1) // Just for start condition for selectNextFirst. + // Just for start condition for selectNextFirst. + if smb.first.seq < seq { + atomic.StoreUint64(&smb.first.seq, seq-1) + } else { + // selectNextFirst always adds 1, so need to subtract 1 here. + atomic.StoreUint64(&smb.first.seq, smb.first.seq-1) + } smb.selectNextFirst() fs.state.FirstSeq = atomic.LoadUint64(&smb.first.seq) @@ -8941,7 +8965,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { } // Create new one regardless. - mb.fss = stree.NewSubjectTree[SimpleState]() + mb.fss = mb.fss.Empty() var smv StoreMsg fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) @@ -8975,7 +8999,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { if mb.fss.Size() > 0 { // Make sure we run the cache expire timer. - mb.llts = getAccessTime() + mb.llts = ats.AccessTime() // Mark fss activity same as load time. mb.lsts = mb.llts mb.startCacheExpireTimer() @@ -8989,7 +9013,7 @@ func (mb *msgBlock) ensurePerSubjectInfoLoaded() error { if mb.fss != nil || mb.noTrack { if mb.fss != nil { // Mark fss activity. - mb.lsts = getAccessTime() + mb.lsts = ats.AccessTime() } return nil } @@ -9513,6 +9537,9 @@ func (fs *fileStore) stop(delete, writeState bool) error { cb(0, -bytes, 0, _EMPTY_) } + // Unregister from the access time service. + ats.Unregister() + return nil } @@ -11135,27 +11162,3 @@ func writeFileWithSync(name string, data []byte, perm fs.FileMode) error { } return f.Close() } - -// This is to offload UnixNano() processing from timestamp creation for cache management. -var ( - tsOnce sync.Once - accessTime atomic.Int64 -) - -// Update every 100ms. -const accessTimeTickInterval = 100 * time.Millisecond - -// Will load the access time from an atomic. We will also setup the Go routine -// to update this in one place. -func getAccessTime() int64 { - tsOnce.Do(func() { - accessTime.Store(time.Now().UnixNano()) - go func() { - ticker := time.NewTicker(accessTimeTickInterval) - for range ticker.C { - accessTime.Store(time.Now().UnixNano()) - } - }() - }) - return accessTime.Load() -} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/gateway.go b/vendor/github.com/nats-io/nats-server/v2/server/gateway.go index 6257e3f61d..ca0e247420 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/gateway.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/gateway.go @@ -424,38 +424,48 @@ func (s *Server) newGateway(opts *Options) error { func (g *srvGateway) updateRemotesTLSConfig(opts *Options) { g.Lock() defer g.Unlock() - - for _, ro := range opts.Gateway.Gateways { - if ro.Name == g.name { + // Instead of going over opts.Gateway.Gateways, which would include only + // explicit remotes, we are going to go through g.remotes. + for name, cfg := range g.remotes { + if name == g.name { continue } - if cfg, ok := g.remotes[ro.Name]; ok { - cfg.Lock() - // If TLS config is in remote, use that one, otherwise, - // use the TLS config from the main block. - if ro.TLSConfig != nil { - cfg.TLSConfig = ro.TLSConfig.Clone() - } else if opts.Gateway.TLSConfig != nil { - cfg.TLSConfig = opts.Gateway.TLSConfig.Clone() - } - - // Ensure that OCSP callbacks are always setup after a reload if needed. - mustStaple := opts.OCSPConfig != nil && opts.OCSPConfig.Mode == OCSPModeAlways - if mustStaple && opts.Gateway.TLSConfig != nil { - clientCB := opts.Gateway.TLSConfig.GetClientCertificate - verifyCB := opts.Gateway.TLSConfig.VerifyConnection - if mustStaple && cfg.TLSConfig != nil { - if clientCB != nil && cfg.TLSConfig.GetClientCertificate == nil { - cfg.TLSConfig.GetClientCertificate = clientCB - } - if verifyCB != nil && cfg.TLSConfig.VerifyConnection == nil { - cfg.TLSConfig.VerifyConnection = verifyCB - } + var ro *RemoteGatewayOpts + // We now need to go back and find the RemoteGatewayOpts but only if + // this remote is explicit (otherwise it won't be found). + if !cfg.isImplicit() { + for _, r := range opts.Gateway.Gateways { + if r.Name == name { + ro = r + break } } - - cfg.Unlock() } + cfg.Lock() + // If we have an `ro` (that means an explicitly defined remote gateway) + // and it has an explicit TLS config, use that one, otherwise (no explicit + // TLS config in the remote, or implicit remote), use the TLS config from + // the main block. + if ro != nil && ro.TLSConfig != nil { + cfg.TLSConfig = ro.TLSConfig.Clone() + } else if opts.Gateway.TLSConfig != nil { + cfg.TLSConfig = opts.Gateway.TLSConfig.Clone() + } + // Ensure that OCSP callbacks are always setup after a reload if needed. + mustStaple := opts.OCSPConfig != nil && opts.OCSPConfig.Mode == OCSPModeAlways + if mustStaple && opts.Gateway.TLSConfig != nil { + clientCB := opts.Gateway.TLSConfig.GetClientCertificate + verifyCB := opts.Gateway.TLSConfig.VerifyConnection + if mustStaple && cfg.TLSConfig != nil { + if clientCB != nil && cfg.TLSConfig.GetClientCertificate == nil { + cfg.TLSConfig.GetClientCertificate = clientCB + } + if verifyCB != nil && cfg.TLSConfig.VerifyConnection == nil { + cfg.TLSConfig.VerifyConnection = verifyCB + } + } + } + cfg.Unlock() } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go index e40fac2c05..ca710a4eaa 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go @@ -233,7 +233,6 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { type keyGen func(context []byte) ([]byte, error) // Return a key generation function or nil if encryption not enabled. -// keyGen defined in filestore.go - keyGen func(iv, context []byte) []byte func (s *Server) jsKeyGen(jsKey, info string) keyGen { if ek := jsKey; ek != _EMPTY_ { return func(context []byte) ([]byte, error) { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 7901f4a777..6c486391b1 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -6289,6 +6289,13 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su return } + // Don't allow updating if all peers are offline. + if s.allPeersOffline(osa.Group) { + resp.Error = NewJSStreamOfflineError() + s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp), nil, errRespDelay) + return + } + // Update asset version metadata. setStaticStreamMetadata(cfg) @@ -7413,6 +7420,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } + // Don't allow updating if all peers are offline. + if s.allPeersOffline(ca.Group) { + resp.Error = NewJSConsumerOfflineError() + s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp), nil, errRespDelay) + return + } } else { // Initialize/update asset version metadata. // First time creating this consumer, or updating. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index 411ff714d2..4da9adff46 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -369,7 +369,7 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { } } if lmsg == nil { - return ms.state.FirstSeq + return ms.state.LastSeq + 1 } last := lmsg.ts @@ -641,7 +641,13 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { func (ms *memStore) AllLastSeqs() ([]uint64, error) { ms.mu.RLock() defer ms.mu.RUnlock() + return ms.allLastSeqsLocked() +} +// allLastSeqsLocked will return a sorted list of last sequences for all +// subjects, but won't take the lock to do it, to avoid the issue of compounding +// read locks causing a deadlock with a write lock. +func (ms *memStore) allLastSeqsLocked() ([]uint64, error) { if len(ms.msgs) == 0 { return nil, nil } @@ -685,7 +691,7 @@ func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed in // See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set. if maxSeq == 0 && maxAllowed <= 0 && ms.filterIsAll(filters) { - return ms.AllLastSeqs() + return ms.allLastSeqsLocked() } // Implied last sequence. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/store.go b/vendor/github.com/nats-io/nats-server/v2/server/store.go index 308d0b08ee..c97e3afd02 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/store.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/store.go @@ -55,7 +55,7 @@ var ( // while a snapshot is in progress. ErrStoreSnapshotInProgress = errors.New("snapshot in progress") // ErrMsgTooLarge is returned when a message is considered too large. - ErrMsgTooLarge = errors.New("message to large") + ErrMsgTooLarge = errors.New("message too large") // ErrStoreWrongType is for when you access the wrong storage type. ErrStoreWrongType = errors.New("wrong storage type") // ErrNoAckPolicy is returned when trying to update a consumer's acks with no ack policy. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index 65afec733f..8e8029e4ac 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -4542,8 +4542,15 @@ func (mset *stream) getDirectMulti(req *JSApiMsgGetRequest, reply string) { // If we have UpToTime set get the proper sequence. if req.UpToTime != nil { upToSeq = store.GetSeqFromTime((*req.UpToTime).UTC()) + // Avoid selecting a first sequence that will take us to before the stream first + // sequence, otherwise we can return messages after the supplied UpToTime. + if upToSeq <= mset.state().FirstSeq { + hdr := []byte("NATS/1.0 404 No Results\r\n\r\n") + mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + return + } // We need to back off one since this is used to determine start sequence normally, - // were as here we want it to be the ceiling. + // whereas here we want it to be the ceiling. upToSeq-- } // If not set, set to the last sequence and remember that for EOB. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go index 7cb23a56eb..add5d0a27f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go @@ -124,7 +124,7 @@ func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) { t.match(t.root, parts, _pre[:0], cb) } -// IterOrdered will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk. +// IterOrdered will walk all entries in the SubjectTree lexicographically. The callback can return false to terminate the walk. func (t *SubjectTree[T]) IterOrdered(cb func(subject []byte, val *T) bool) { if t == nil || t.root == nil { return @@ -244,6 +244,10 @@ func (t *SubjectTree[T]) delete(np *node, subject []byte, si int) (*T, bool) { } // Not a leaf node. if bn := n.base(); len(bn.prefix) > 0 { + // subject could be shorter and would panic on bad index into subject slice. + if len(subject) < si+len(bn.prefix) { + return nil, false + } if !bytes.Equal(subject[si:si+len(bn.prefix)], bn.prefix) { return nil, false } @@ -377,7 +381,7 @@ func (t *SubjectTree[T]) match(n node, parts [][]byte, pre []byte, cb func(subje } } -// Interal iter function to walk nodes in lexigraphical order. +// Internal iter function to walk nodes in lexicographical order. func (t *SubjectTree[T]) iter(n node, pre []byte, ordered bool, cb func(subject []byte, val *T) bool) bool { if n.isLeaf() { ln := n.(*leaf[T]) @@ -418,3 +422,29 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, ordered bool, cb func(subject } return true } + +// LazyIntersect iterates the smaller of the two provided subject trees and +// looks for matching entries in the other. It is lazy in that it does not +// aggressively optimize against repeated walks, but is considerably faster +// in most cases than intersecting against a potentially large sublist. +func LazyIntersect[TL, TR any](tl *SubjectTree[TL], tr *SubjectTree[TR], cb func([]byte, *TL, *TR)) { + if tl.root == nil || tr.root == nil { + return + } + // Iterate over the smaller tree to reduce the number of rounds. + if tl.Size() <= tr.Size() { + tl.IterFast(func(key []byte, v1 *TL) bool { + if v2, ok := tr.Find(key); ok { + cb(key, v1, v2) + } + return true + }) + } else { + tr.IterFast(func(key []byte, v2 *TR) bool { + if v1, ok := tl.Find(key); ok { + cb(key, v1, v2) + } + return true + }) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 28d9924866..0176a9cca2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -703,7 +703,7 @@ github.com/google/go-querystring/query # github.com/google/go-tika v0.3.1 ## explicit; go 1.11 github.com/google/go-tika/tika -# github.com/google/go-tpm v0.9.3 +# github.com/google/go-tpm v0.9.5 ## explicit; go 1.22 github.com/google/go-tpm/legacy/tpm2 github.com/google/go-tpm/tpmutil @@ -992,13 +992,14 @@ github.com/munnerz/goautoneg # github.com/nats-io/jwt/v2 v2.7.4 ## explicit; go 1.23.0 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.11.3 +# github.com/nats-io/nats-server/v2 v2.11.4 ## explicit; go 1.23.0 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/fastrand github.com/nats-io/nats-server/v2/internal/ldap github.com/nats-io/nats-server/v2/logger github.com/nats-io/nats-server/v2/server +github.com/nats-io/nats-server/v2/server/ats github.com/nats-io/nats-server/v2/server/avl github.com/nats-io/nats-server/v2/server/certidp github.com/nats-io/nats-server/v2/server/certstore