Merge pull request #914 from opencloud-eu/dependabot/go_modules/github.com/nats-io/nats-server/v2-2.11.4

build(deps): bump github.com/nats-io/nats-server/v2 from 2.11.3 to 2.11.4
This commit is contained in:
Ralf Haferkamp
2025-05-22 17:46:32 +02:00
committed by GitHub
18 changed files with 399 additions and 199 deletions

4
go.mod
View File

@@ -55,7 +55,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.3
github.com/nats-io/nats-server/v2 v2.11.4
github.com/nats-io/nats.go v1.42.0
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v1.0.6
@@ -215,7 +215,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v1.9.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/go-tpm v0.9.3 // indirect
github.com/google/go-tpm v0.9.5 // indirect
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
github.com/google/renameio/v2 v2.0.0 // indirect
github.com/gookit/color v1.5.4 // indirect

8
go.sum
View File

@@ -527,8 +527,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/go-tika v0.3.1 h1:l+jr10hDhZjcgxFRfcQChRLo1bPXQeLFluMyvDhXTTA=
github.com/google/go-tika v0.3.1/go.mod h1:DJh5N8qxXIl85QkqmXknd+PeeRkUOTbvwyYf7ieDz6c=
github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc=
github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU=
github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@@ -820,8 +820,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.3 h1:AbGtXxuwjo0gBroLGGr/dE0vf24kTKdRnBq/3z/Fdoc=
github.com/nats-io/nats-server/v2 v2.11.3/go.mod h1:6Z6Fd+JgckqzKig7DYwhgrE7bJ6fypPHnGPND+DqgMY=
github.com/nats-io/nats-server/v2 v2.11.4 h1:oQhvy6He6ER926sGqIKBKuYHH4BGnUQCNb0Y5Qa+M54=
github.com/nats-io/nats-server/v2 v2.11.4/go.mod h1:jFnKKwbNeq6IfLHq+OMnl7vrFRihQ/MkhRbiWfjLdjU=
github.com/nats-io/nats.go v1.42.0 h1:ynIMupIOvf/ZWH/b2qda6WGKGNSjwOUutTpWRvAmhaM=
github.com/nats-io/nats.go v1.42.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=

View File

@@ -447,6 +447,7 @@ const (
CmdClear tpmutil.Command = 0x00000126
CmdHierarchyChangeAuth tpmutil.Command = 0x00000129
CmdDefineSpace tpmutil.Command = 0x0000012A
CmdPCRAllocate tpmutil.Command = 0x0000012B
CmdCreatePrimary tpmutil.Command = 0x00000131
CmdIncrementNVCounter tpmutil.Command = 0x00000134
CmdWriteNV tpmutil.Command = 0x00000137

View File

@@ -51,6 +51,10 @@ func encodeTPMLPCRSelection(sel ...PCRSelection) ([]byte, error) {
return tpmutil.Pack(uint32(0))
}
if len(sel) == 1 && len(sel[0].PCRs) == 0 && sel[0].Hash == 0 {
return tpmutil.Pack(uint32(0))
}
// PCR selection is a variable-size bitmask, where position of a set bit is
// the selected PCR index.
// Size of the bitmask in bytes is pre-pended. It should be at least
@@ -61,10 +65,6 @@ func encodeTPMLPCRSelection(sel ...PCRSelection) ([]byte, error) {
// 00000011 00000000 00000001 00000100
var retBytes []byte
for _, s := range sel {
if len(s.PCRs) == 0 {
return tpmutil.Pack(uint32(0))
}
ts := tpmsPCRSelection{
Hash: s.Hash,
Size: sizeOfPCRSelect,
@@ -1153,6 +1153,34 @@ func Clear(rw io.ReadWriter, handle tpmutil.Handle, auth AuthCommand) error {
return err
}
func encodePCRAllocate(handle tpmutil.Handle, auth AuthCommand, pcrSelection []PCRSelection) ([]byte, error) {
ah, err := tpmutil.Pack(handle)
if err != nil {
return nil, err
}
authEncoded, err := encodeAuthArea(auth)
if err != nil {
return nil, err
}
pcrEncoded, err := encodeTPMLPCRSelection(pcrSelection...)
if err != nil {
return nil, err
}
return concat(ah, authEncoded, pcrEncoded)
}
// PCRAllocate sets the desired PCR allocation of PCR and algorithms.
// The changes take effect once the TPM is restarted.
func PCRAllocate(rw io.ReadWriter, handle tpmutil.Handle, auth AuthCommand, pcrSelection []PCRSelection) error {
Cmd, err := encodePCRAllocate(handle, auth, pcrSelection)
if err != nil {
return err
}
_, err = runCommand(rw, TagSessions, CmdPCRAllocate, tpmutil.RawBytes(Cmd))
return err
}
func encodeHierarchyChangeAuth(handle tpmutil.Handle, auth AuthCommand, newAuth string) ([]byte, error) {
ah, err := tpmutil.Pack(handle)
if err != nil {

View File

@@ -5,7 +5,7 @@ Revision 1.1
Authors: Ivan Kozlovic, Lev Brouk
NATS Server currently supports most of MQTT 3.1.1. This document describes how
it is implementated.
it is implemented.
It is strongly recommended to review the [MQTT v3.1.1
specifications](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)

View File

@@ -0,0 +1,83 @@
// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ats controls the go routines for the access time service.
// This allows more efficient unixnano operations for cache access times.
// We will have one per binary (usually per server).
package ats
import (
"sync/atomic"
"time"
)
// Update every 100ms for gathering access time in unix nano.
const TickInterval = 100 * time.Millisecond
var (
// Our unix nano time.
utime atomic.Int64
// How may registered users do we have, controls lifetime of Go routine.
refs atomic.Int64
// To signal the shutdown of the Go routine.
done chan struct{}
)
func init() {
// Initialize our done chan.
done = make(chan struct{}, 1)
}
// Register usage. This will happen on filestore creation.
func Register() {
if v := refs.Add(1); v == 1 {
// This is the first to register (could also go up and down),
// so spin up Go routine and grab initial time.
utime.Store(time.Now().UnixNano())
go func() {
ticker := time.NewTicker(TickInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
utime.Store(time.Now().UnixNano())
case <-done:
return
}
}
}()
}
}
// Unregister usage. We will shutdown the go routine if no more registered users.
func Unregister() {
if v := refs.Add(-1); v == 0 {
done <- struct{}{}
} else if v < 0 {
refs.Store(0)
panic("unbalanced unregister for access time state")
}
}
// Will load the access time from an atomic.
// If no one has registered this will return 0 or stale data.
// It is the responsibility of the user to properly register and unregister.
func AccessTime() int64 {
// Return last updated time.
v := utime.Load()
if v == 0 {
panic("access time service not running")
}
return v
}

View File

@@ -15,8 +15,10 @@ package server
import (
"bytes"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
@@ -6066,10 +6068,22 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi
}
if err != nil {
var detail string
var subjs []string
if ve, ok := err.(*tls.CertificateVerificationError); ok {
for _, cert := range ve.UnverifiedCertificates {
fp := sha256.Sum256(cert.Raw)
fph := hex.EncodeToString(fp[:])
subjs = append(subjs, fmt.Sprintf("%s SHA-256: %s", cert.Subject.String(), fph))
}
}
if len(subjs) > 0 {
detail = fmt.Sprintf(" (%s)", strings.Join(subjs, "; "))
}
if kind == CLIENT {
c.Errorf("TLS handshake error: %v", err)
c.Errorf("TLS handshake error: %v%s", err, detail)
} else {
c.Errorf("TLS %s handshake error: %v", typ, err)
c.Errorf("TLS %s handshake error: %v%s", typ, err, detail)
}
c.closeConnection(TLSHandshakeError)

View File

@@ -58,7 +58,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.11.3"
VERSION = "2.11.4"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -2976,6 +2976,11 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
TimeStamp: time.Now().UTC(),
PriorityGroups: priorityGroups,
}
// Reset redelivered for MaxDeliver 1. Redeliveries are disabled so must not report it (is confusing otherwise).
// The state does still keep track of these messages.
if o.cfg.MaxDeliver == 1 {
info.NumRedelivered = 0
}
if o.cfg.PauseUntil != nil {
p := *o.cfg.PauseUntil
if info.Paused = time.Now().Before(p); info.Paused {

View File

@@ -43,6 +43,7 @@ import (
"github.com/klauspost/compress/s2"
"github.com/minio/highwayhash"
"github.com/nats-io/nats-server/v2/server/ats"
"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nats-server/v2/server/stree"
"github.com/nats-io/nats-server/v2/server/thw"
@@ -367,7 +368,7 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
return newFileStoreWithCreated(fcfg, cfg, time.Now().UTC(), nil, nil)
}
func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time, prf, oldprf keyGen) (*fileStore, error) {
func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time, prf, oldprf keyGen) (fs *fileStore, err error) {
if cfg.Name == _EMPTY_ {
return nil, fmt.Errorf("name required")
}
@@ -409,7 +410,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
os.Remove(tmpfile.Name())
dios <- struct{}{}
fs := &fileStore{
fs = &fileStore{
fcfg: fcfg,
psim: stree.NewSubjectTree[psi](),
bim: make(map[uint32]*msgBlock),
@@ -421,6 +422,16 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
srv: fcfg.srv,
}
// Register with access time service.
ats.Register()
// If we error before completion make sure to cleanup.
defer func() {
if err != nil {
ats.Unregister()
}
}()
// Only create a THW if we're going to allow TTLs.
if cfg.AllowMsgTTL {
fs.ttls = thw.NewHashWheel()
@@ -475,18 +486,20 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}
// Check if our prior state remembers a last sequence past where we can see.
if fs.ld != nil && prior.LastSeq > fs.state.LastSeq {
if prior.LastSeq > fs.state.LastSeq {
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
if fs.state.Msgs == 0 {
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.FirstTime = time.Time{}
}
if _, err := fs.newMsgBlockForWrite(); err == nil {
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
if fs.ld != nil {
if _, err := fs.newMsgBlockForWrite(); err == nil {
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
return nil, err
}
} else {
return nil, err
}
} else {
return nil, err
}
}
// Since we recovered here, make sure to kick ourselves to write out our stream state.
@@ -503,7 +516,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Also make sure we get rid of old idx and fss files on return.
// Do this in separate go routine vs inline and at end of processing.
defer func() {
go fs.cleanupOldMeta()
if fs != nil {
go fs.cleanupOldMeta()
}
}()
// Lock while we do enforcements and removals.
@@ -2353,7 +2368,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg)
var updateLLTS bool
defer func() {
if updateLLTS {
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
}
mb.mu.Unlock()
}()
@@ -2468,12 +2483,12 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
var updateLLTS bool
defer func() {
if updateLLTS {
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
}
mb.mu.Unlock()
}()
fseq, isAll, subs := start, filter == _EMPTY_ || filter == fwcs, []string{filter}
fseq, isAll := start, filter == _EMPTY_ || filter == fwcs
var didLoad bool
if mb.fssNotLoaded() {
@@ -2482,7 +2497,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
didLoad = true
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
if filter == _EMPTY_ {
filter = fwcs
@@ -2501,18 +2516,15 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
}
}
// Make sure to start at mb.first.seq if fseq < mb.first.seq
if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq {
fseq = seq
}
fseq = max(fseq, atomic.LoadUint64(&mb.first.seq))
lseq := atomic.LoadUint64(&mb.last.seq)
// Optionally build the isMatch for wildcard filters.
_tsa, _fsa := [32]string{}, [32]string{}
tsa, fsa := _tsa[:0], _fsa[:0]
var isMatch func(subj string) bool
// Decide to build.
if wc {
fsa = tokenizeSubjectIntoSlice(fsa[:0], filter)
_tsa, _fsa := [32]string{}, [32]string{}
tsa, fsa := _tsa[:0], tokenizeSubjectIntoSlice(_fsa[:0], filter)
isMatch = func(subj string) bool {
tsa = tokenizeSubjectIntoSlice(tsa[:0], subj)
return isSubsetMatchTokenized(tsa, fsa)
@@ -2532,29 +2544,22 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
if !doLinearScan {
// If we have a wildcard match against all tracked subjects we know about.
if wc {
subs = subs[:0]
mb.fss.Match(stringToBytes(filter), func(bsubj []byte, _ *SimpleState) {
subs = append(subs, string(bsubj))
})
// Check if we matched anything
if len(subs) == 0 {
return nil, didLoad, ErrStoreMsgNotFound
}
}
fseq = lseq + 1
for _, subj := range subs {
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
if bfilter := stringToBytes(filter); wc {
mb.fss.Match(bfilter, func(bsubj []byte, ss *SimpleState) {
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(bytesToString(bsubj), ss)
}
if start <= ss.Last {
fseq = min(fseq, max(start, ss.First))
}
})
} else if ss, _ := mb.fss.Find(bfilter); ss != nil {
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(filter, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
}
if ss.First < start {
fseq = start
} else {
fseq = ss.First
if start <= ss.Last {
fseq = min(fseq, max(start, ss.First))
}
}
}
@@ -2563,13 +2568,6 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
return nil, didLoad, ErrStoreMsgNotFound
}
// If we guess to not do a linear scan, but the above resulted in alot of subs that will
// need to be checked for every scanned message, revert.
// TODO(dlc) - we could memoize the subs across calls.
if !doLinearScan && len(subs) > int(lseq-fseq) {
doLinearScan = true
}
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
@@ -2602,18 +2600,10 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
if isAll {
return fsm, expireOk, nil
}
if doLinearScan {
if wc && isMatch(sm.subj) {
return fsm, expireOk, nil
} else if !wc && fsm.subj == filter {
return fsm, expireOk, nil
}
} else {
for _, subj := range subs {
if fsm.subj == subj {
return fsm, expireOk, nil
}
}
if wc && isMatch(sm.subj) {
return fsm, expireOk, nil
} else if !wc && fsm.subj == filter {
return fsm, expireOk, nil
}
// If we are here we did not match, so put the llseq back.
mb.llseq = llseq
@@ -2996,7 +2986,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
shouldExpire = true
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
subj := string(bsubj)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
@@ -3029,7 +3019,13 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
func (fs *fileStore) AllLastSeqs() ([]uint64, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
return fs.allLastSeqsLocked()
}
// allLastSeqsLocked will return a sorted list of last sequences for all
// subjects, but won't take the lock to do it, to avoid the issue of compounding
// read locks causing a deadlock with a write lock.
func (fs *fileStore) allLastSeqsLocked() ([]uint64, error) {
if fs.state.Msgs == 0 || fs.noTrackSubjects() {
return nil, nil
}
@@ -3100,7 +3096,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i
// See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set.
if maxSeq == 0 && maxAllowed <= 0 && fs.filterIsAll(filters) {
return fs.AllLastSeqs()
return fs.allLastSeqsLocked()
}
lastBlkIndex := len(fs.blks) - 1
@@ -3351,7 +3347,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
mb.tryForceExpireCacheLocked()
}
if updateLLTS {
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
}
mb.mu.Unlock()
return total, validThrough
@@ -3377,7 +3373,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
shouldExpire = true
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
var t uint64
var havePartial bool
@@ -3477,7 +3473,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
shouldExpire = true
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) {
adjust += ss.Msgs
@@ -3517,7 +3513,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
mb.tryForceExpireCacheLocked()
}
if updateLLTS {
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
}
mb.mu.Unlock()
}
@@ -3660,7 +3656,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
mb.tryForceExpireCacheLocked()
}
if updateLLTS {
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
}
mb.mu.Unlock()
return total, validThrough
@@ -3685,7 +3681,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
shouldExpire = true
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
var t uint64
var havePartial bool
@@ -3738,7 +3734,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
mb.tryForceExpireCacheLocked()
}
if updateLLTS {
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
}
mb.mu.Unlock()
total += t
@@ -3798,7 +3794,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
shouldExpire = true
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
adjust += ss.Msgs
})
@@ -3837,7 +3833,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
mb.tryForceExpireCacheLocked()
}
if updateLLTS {
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
}
mb.mu.Unlock()
}
@@ -3922,7 +3918,7 @@ func (mb *msgBlock) setupWriteCache(buf []byte) {
if fi != nil {
mb.cache.off = int(fi.Size())
}
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
mb.startCacheExpireTimer()
}
@@ -3953,7 +3949,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
mb.fss = stree.NewSubjectTree[SimpleState]()
// Set cache time to creation time to start.
mb.llts, mb.lwts = 0, getAccessTime()
mb.llts, mb.lwts = 0, ats.AccessTime()
// Remember our last sequence number.
atomic.StoreUint64(&mb.first.seq, fs.state.LastSeq+1)
atomic.StoreUint64(&mb.last.seq, fs.state.LastSeq)
@@ -4211,7 +4207,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
return
}
var needsRecord bool
nowts := getAccessTime()
nowts := ats.AccessTime()
mb.mu.Lock()
// If we are empty can just do meta.
@@ -4403,7 +4399,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
shouldExpire = true
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
bsubj := stringToBytes(subj)
if ss, ok := mb.fss.Find(bsubj); ok && ss != nil {
@@ -4492,11 +4488,18 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
var numMsgs uint64
// collect all that are not correct.
needAttention := make(map[string]*psi)
needAttention := stree.NewSubjectTree[uint64]()
fblk, lblk := uint32(math.MaxUint32), uint32(0)
fs.psim.IterFast(func(subj []byte, psi *psi) bool {
numMsgs += psi.total
if psi.total > maxMsgsPer {
needAttention[string(subj)] = psi
needAttention.Insert(subj, psi.total)
if psi.fblk < fblk {
fblk = psi.fblk
}
if psi.lblk > lblk {
lblk = psi.lblk
}
}
return true
})
@@ -4517,55 +4520,71 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
// Rebuild fs state too.
fs.rebuildStateLocked(nil)
// Need to redo blocks that need attention.
needAttention = make(map[string]*psi)
needAttention.Empty()
fblk, lblk = uint32(math.MaxUint32), uint32(0)
fs.psim.IterFast(func(subj []byte, psi *psi) bool {
if psi.total > maxMsgsPer {
needAttention[string(subj)] = psi
needAttention.Insert(subj, psi.total)
if psi.fblk < fblk {
fblk = psi.fblk
}
if psi.lblk > lblk {
lblk = psi.lblk
}
}
return true
})
}
// If nothing to do then stop.
if fblk == math.MaxUint32 {
return
}
// Collect all the msgBlks we alter.
blks := make(map[*msgBlock]struct{})
// For re-use below.
var sm StoreMsg
// Walk all subjects that need attention here.
for subj, info := range needAttention {
total, start, stop := info.total, info.fblk, info.lblk
for i := start; i <= stop; i++ {
mb := fs.bim[i]
if mb == nil {
continue
var fss *stree.SubjectTree[*SimpleState]
for i := fblk; i <= lblk; i++ {
mb := fs.bim[i]
if mb == nil {
continue
}
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
// It isn't safe to intersect mb.fss directly, because removeMsgViaLimits modifies it
// during the iteration, which can cause us to miss keys. We won't copy the entire
// SimpleState structs though but rather just take pointers for speed.
fss = fss.Empty()
mb.fss.IterFast(func(subject []byte, val *SimpleState) bool {
fss.Insert(subject, val)
return true
})
mb.mu.Unlock()
stree.LazyIntersect(needAttention, fss, func(subj []byte, total *uint64, ssptr **SimpleState) {
if ssptr == nil || total == nil {
return
}
// Grab the ss entry for this subject in case sparse.
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
ss := *ssptr
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.mu.Lock()
mb.recalculateForSubj(bytesToString(subj), ss)
mb.mu.Unlock()
}
mb.mu.Unlock()
if ss == nil {
continue
}
for seq := ss.First; seq <= ss.Last && total > maxMsgsPer; {
m, _, err := mb.firstMatching(subj, false, seq, &sm)
if err == nil {
seq = m.seq + 1
if removed, _ := fs.removeMsgViaLimits(m.seq); removed {
total--
blks[mb] = struct{}{}
}
} else {
// On error just do single increment.
seq++
for first := ss.First; *total > maxMsgsPer && first <= ss.Last; {
m, _, err := mb.firstMatching(bytesToString(subj), false, first, &sm)
if err != nil {
break
}
first = m.seq + 1
if removed, _ := fs.removeMsgViaLimits(m.seq); removed {
blks[mb] = struct{}{}
*total--
}
}
}
})
}
// Expire the cache if we can.
@@ -4699,7 +4718,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
// Set cache timestamp for last remove.
mb.lrts = getAccessTime()
mb.lrts = ats.AccessTime()
// Global stats
if fs.state.Msgs > 0 {
@@ -5489,7 +5508,7 @@ func (mb *msgBlock) expireCacheLocked() {
}
// Grab timestamp to compare.
tns := getAccessTime()
tns := ats.AccessTime()
// For the core buffer of messages, we care about reads and writes, but not removes.
bufts := mb.llts
@@ -5599,7 +5618,7 @@ func (fs *fileStore) expireMsgs() {
fs.mu.RLock()
maxAge := int64(fs.cfg.MaxAge)
minAge := getAccessTime() - maxAge
minAge := ats.AccessTime() - maxAge
rmcb := fs.rmcb
sdmcb := fs.sdmcb
sdmTTL := int64(fs.cfg.SubjectDeleteMarkerTTL.Seconds())
@@ -5616,7 +5635,7 @@ func (fs *fileStore) expireMsgs() {
if len(sm.hdr) > 0 {
if ttl, err := getMessageTTL(sm.hdr); err == nil && ttl < 0 {
// The message has a negative TTL, therefore it must "never expire".
minAge = getAccessTime() - maxAge
minAge = ats.AccessTime() - maxAge
continue
}
}
@@ -5633,7 +5652,7 @@ func (fs *fileStore) expireMsgs() {
fs.mu.Unlock()
}
// Recalculate in case we are expiring a bunch.
minAge = getAccessTime() - maxAge
minAge = ats.AccessTime() - maxAge
}
}
@@ -5882,7 +5901,7 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg
return err
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil {
ss.Msgs++
ss.Last = seq
@@ -6560,7 +6579,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
popFss = true
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
mb.ttls = 0
lbuf := uint32(len(buf))
@@ -6786,7 +6805,7 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) {
// Decide what we want to do with the buffer in hand. If we have load interest
// we will hold onto the whole thing, otherwise empty the buffer, possibly reusing it.
if ts := getAccessTime(); ts < mb.llts || (ts-mb.llts) <= int64(mb.cexp) {
if ts := ats.AccessTime(); ts < mb.llts || (ts-mb.llts) <= int64(mb.cexp) {
mb.cache.wp += lob
} else {
if cap(mb.cache.buf) <= maxBufReuse {
@@ -6949,7 +6968,7 @@ checkCache:
return nil
}
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
// FIXME(dlc) - We could be smarter here.
if buf, _ := mb.bytesPending(); len(buf) > 0 {
@@ -7138,7 +7157,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
// If we have a delete map check it.
if mb.dmap.Exists(seq) {
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
return nil, errDeletedMsg
}
@@ -7169,7 +7188,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
}
// Update cache activity.
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
@@ -7430,7 +7449,7 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err
return nil, err
}
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
var l uint64
// Optimize if subject is not a wildcard.
@@ -8332,8 +8351,13 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) {
} else {
// Make sure to sync changes.
smb.needSync = true
// Update fs first seq and time.
atomic.StoreUint64(&smb.first.seq, seq-1) // Just for start condition for selectNextFirst.
// Just for start condition for selectNextFirst.
if smb.first.seq < seq {
atomic.StoreUint64(&smb.first.seq, seq-1)
} else {
// selectNextFirst always adds 1, so need to subtract 1 here.
atomic.StoreUint64(&smb.first.seq, smb.first.seq-1)
}
smb.selectNextFirst()
fs.state.FirstSeq = atomic.LoadUint64(&smb.first.seq)
@@ -8941,7 +8965,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
}
// Create new one regardless.
mb.fss = stree.NewSubjectTree[SimpleState]()
mb.fss = mb.fss.Empty()
var smv StoreMsg
fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq)
@@ -8975,7 +8999,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if mb.fss.Size() > 0 {
// Make sure we run the cache expire timer.
mb.llts = getAccessTime()
mb.llts = ats.AccessTime()
// Mark fss activity same as load time.
mb.lsts = mb.llts
mb.startCacheExpireTimer()
@@ -8989,7 +9013,7 @@ func (mb *msgBlock) ensurePerSubjectInfoLoaded() error {
if mb.fss != nil || mb.noTrack {
if mb.fss != nil {
// Mark fss activity.
mb.lsts = getAccessTime()
mb.lsts = ats.AccessTime()
}
return nil
}
@@ -9513,6 +9537,9 @@ func (fs *fileStore) stop(delete, writeState bool) error {
cb(0, -bytes, 0, _EMPTY_)
}
// Unregister from the access time service.
ats.Unregister()
return nil
}
@@ -11135,27 +11162,3 @@ func writeFileWithSync(name string, data []byte, perm fs.FileMode) error {
}
return f.Close()
}
// This is to offload UnixNano() processing from timestamp creation for cache management.
var (
tsOnce sync.Once
accessTime atomic.Int64
)
// Update every 100ms.
const accessTimeTickInterval = 100 * time.Millisecond
// Will load the access time from an atomic. We will also setup the Go routine
// to update this in one place.
func getAccessTime() int64 {
tsOnce.Do(func() {
accessTime.Store(time.Now().UnixNano())
go func() {
ticker := time.NewTicker(accessTimeTickInterval)
for range ticker.C {
accessTime.Store(time.Now().UnixNano())
}
}()
})
return accessTime.Load()
}

View File

@@ -424,38 +424,48 @@ func (s *Server) newGateway(opts *Options) error {
func (g *srvGateway) updateRemotesTLSConfig(opts *Options) {
g.Lock()
defer g.Unlock()
for _, ro := range opts.Gateway.Gateways {
if ro.Name == g.name {
// Instead of going over opts.Gateway.Gateways, which would include only
// explicit remotes, we are going to go through g.remotes.
for name, cfg := range g.remotes {
if name == g.name {
continue
}
if cfg, ok := g.remotes[ro.Name]; ok {
cfg.Lock()
// If TLS config is in remote, use that one, otherwise,
// use the TLS config from the main block.
if ro.TLSConfig != nil {
cfg.TLSConfig = ro.TLSConfig.Clone()
} else if opts.Gateway.TLSConfig != nil {
cfg.TLSConfig = opts.Gateway.TLSConfig.Clone()
}
// Ensure that OCSP callbacks are always setup after a reload if needed.
mustStaple := opts.OCSPConfig != nil && opts.OCSPConfig.Mode == OCSPModeAlways
if mustStaple && opts.Gateway.TLSConfig != nil {
clientCB := opts.Gateway.TLSConfig.GetClientCertificate
verifyCB := opts.Gateway.TLSConfig.VerifyConnection
if mustStaple && cfg.TLSConfig != nil {
if clientCB != nil && cfg.TLSConfig.GetClientCertificate == nil {
cfg.TLSConfig.GetClientCertificate = clientCB
}
if verifyCB != nil && cfg.TLSConfig.VerifyConnection == nil {
cfg.TLSConfig.VerifyConnection = verifyCB
}
var ro *RemoteGatewayOpts
// We now need to go back and find the RemoteGatewayOpts but only if
// this remote is explicit (otherwise it won't be found).
if !cfg.isImplicit() {
for _, r := range opts.Gateway.Gateways {
if r.Name == name {
ro = r
break
}
}
cfg.Unlock()
}
cfg.Lock()
// If we have an `ro` (that means an explicitly defined remote gateway)
// and it has an explicit TLS config, use that one, otherwise (no explicit
// TLS config in the remote, or implicit remote), use the TLS config from
// the main block.
if ro != nil && ro.TLSConfig != nil {
cfg.TLSConfig = ro.TLSConfig.Clone()
} else if opts.Gateway.TLSConfig != nil {
cfg.TLSConfig = opts.Gateway.TLSConfig.Clone()
}
// Ensure that OCSP callbacks are always setup after a reload if needed.
mustStaple := opts.OCSPConfig != nil && opts.OCSPConfig.Mode == OCSPModeAlways
if mustStaple && opts.Gateway.TLSConfig != nil {
clientCB := opts.Gateway.TLSConfig.GetClientCertificate
verifyCB := opts.Gateway.TLSConfig.VerifyConnection
if mustStaple && cfg.TLSConfig != nil {
if clientCB != nil && cfg.TLSConfig.GetClientCertificate == nil {
cfg.TLSConfig.GetClientCertificate = clientCB
}
if verifyCB != nil && cfg.TLSConfig.VerifyConnection == nil {
cfg.TLSConfig.VerifyConnection = verifyCB
}
}
}
cfg.Unlock()
}
}

View File

@@ -233,7 +233,6 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
type keyGen func(context []byte) ([]byte, error)
// Return a key generation function or nil if encryption not enabled.
// keyGen defined in filestore.go - keyGen func(iv, context []byte) []byte
func (s *Server) jsKeyGen(jsKey, info string) keyGen {
if ek := jsKey; ek != _EMPTY_ {
return func(context []byte) ([]byte, error) {

View File

@@ -6289,6 +6289,13 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
return
}
// Don't allow updating if all peers are offline.
if s.allPeersOffline(osa.Group) {
resp.Error = NewJSStreamOfflineError()
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp), nil, errRespDelay)
return
}
// Update asset version metadata.
setStaticStreamMetadata(cfg)
@@ -7413,6 +7420,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
// Don't allow updating if all peers are offline.
if s.allPeersOffline(ca.Group) {
resp.Error = NewJSConsumerOfflineError()
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp), nil, errRespDelay)
return
}
} else {
// Initialize/update asset version metadata.
// First time creating this consumer, or updating.

View File

@@ -369,7 +369,7 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
}
}
if lmsg == nil {
return ms.state.FirstSeq
return ms.state.LastSeq + 1
}
last := lmsg.ts
@@ -641,7 +641,13 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
func (ms *memStore) AllLastSeqs() ([]uint64, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.allLastSeqsLocked()
}
// allLastSeqsLocked will return a sorted list of last sequences for all
// subjects, but won't take the lock to do it, to avoid the issue of compounding
// read locks causing a deadlock with a write lock.
func (ms *memStore) allLastSeqsLocked() ([]uint64, error) {
if len(ms.msgs) == 0 {
return nil, nil
}
@@ -685,7 +691,7 @@ func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed in
// See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set.
if maxSeq == 0 && maxAllowed <= 0 && ms.filterIsAll(filters) {
return ms.AllLastSeqs()
return ms.allLastSeqsLocked()
}
// Implied last sequence.

View File

@@ -55,7 +55,7 @@ var (
// while a snapshot is in progress.
ErrStoreSnapshotInProgress = errors.New("snapshot in progress")
// ErrMsgTooLarge is returned when a message is considered too large.
ErrMsgTooLarge = errors.New("message to large")
ErrMsgTooLarge = errors.New("message too large")
// ErrStoreWrongType is for when you access the wrong storage type.
ErrStoreWrongType = errors.New("wrong storage type")
// ErrNoAckPolicy is returned when trying to update a consumer's acks with no ack policy.

View File

@@ -4542,8 +4542,15 @@ func (mset *stream) getDirectMulti(req *JSApiMsgGetRequest, reply string) {
// If we have UpToTime set get the proper sequence.
if req.UpToTime != nil {
upToSeq = store.GetSeqFromTime((*req.UpToTime).UTC())
// Avoid selecting a first sequence that will take us to before the stream first
// sequence, otherwise we can return messages after the supplied UpToTime.
if upToSeq <= mset.state().FirstSeq {
hdr := []byte("NATS/1.0 404 No Results\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}
// We need to back off one since this is used to determine start sequence normally,
// were as here we want it to be the ceiling.
// whereas here we want it to be the ceiling.
upToSeq--
}
// If not set, set to the last sequence and remember that for EOB.

View File

@@ -124,7 +124,7 @@ func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) {
t.match(t.root, parts, _pre[:0], cb)
}
// IterOrdered will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk.
// IterOrdered will walk all entries in the SubjectTree lexicographically. The callback can return false to terminate the walk.
func (t *SubjectTree[T]) IterOrdered(cb func(subject []byte, val *T) bool) {
if t == nil || t.root == nil {
return
@@ -244,6 +244,10 @@ func (t *SubjectTree[T]) delete(np *node, subject []byte, si int) (*T, bool) {
}
// Not a leaf node.
if bn := n.base(); len(bn.prefix) > 0 {
// subject could be shorter and would panic on bad index into subject slice.
if len(subject) < si+len(bn.prefix) {
return nil, false
}
if !bytes.Equal(subject[si:si+len(bn.prefix)], bn.prefix) {
return nil, false
}
@@ -377,7 +381,7 @@ func (t *SubjectTree[T]) match(n node, parts [][]byte, pre []byte, cb func(subje
}
}
// Interal iter function to walk nodes in lexigraphical order.
// Internal iter function to walk nodes in lexicographical order.
func (t *SubjectTree[T]) iter(n node, pre []byte, ordered bool, cb func(subject []byte, val *T) bool) bool {
if n.isLeaf() {
ln := n.(*leaf[T])
@@ -418,3 +422,29 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, ordered bool, cb func(subject
}
return true
}
// LazyIntersect iterates the smaller of the two provided subject trees and
// looks for matching entries in the other. It is lazy in that it does not
// aggressively optimize against repeated walks, but is considerably faster
// in most cases than intersecting against a potentially large sublist.
func LazyIntersect[TL, TR any](tl *SubjectTree[TL], tr *SubjectTree[TR], cb func([]byte, *TL, *TR)) {
if tl.root == nil || tr.root == nil {
return
}
// Iterate over the smaller tree to reduce the number of rounds.
if tl.Size() <= tr.Size() {
tl.IterFast(func(key []byte, v1 *TL) bool {
if v2, ok := tr.Find(key); ok {
cb(key, v1, v2)
}
return true
})
} else {
tr.IterFast(func(key []byte, v2 *TR) bool {
if v1, ok := tl.Find(key); ok {
cb(key, v1, v2)
}
return true
})
}
}

5
vendor/modules.txt vendored
View File

@@ -703,7 +703,7 @@ github.com/google/go-querystring/query
# github.com/google/go-tika v0.3.1
## explicit; go 1.11
github.com/google/go-tika/tika
# github.com/google/go-tpm v0.9.3
# github.com/google/go-tpm v0.9.5
## explicit; go 1.22
github.com/google/go-tpm/legacy/tpm2
github.com/google/go-tpm/tpmutil
@@ -992,13 +992,14 @@ github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.7.4
## explicit; go 1.23.0
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.11.3
# github.com/nats-io/nats-server/v2 v2.11.4
## explicit; go 1.23.0
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand
github.com/nats-io/nats-server/v2/internal/ldap
github.com/nats-io/nats-server/v2/logger
github.com/nats-io/nats-server/v2/server
github.com/nats-io/nats-server/v2/server/ats
github.com/nats-io/nats-server/v2/server/avl
github.com/nats-io/nats-server/v2/server/certidp
github.com/nats-io/nats-server/v2/server/certstore