Merge pull request #754 from opencloud-eu/dependabot/go_modules/github.com/nats-io/nats-server/v2-2.11.2

build(deps): bump github.com/nats-io/nats-server/v2 from 2.11.1 to 2.11.2
This commit is contained in:
Ralf Haferkamp
2025-04-29 17:48:57 +02:00
committed by GitHub
29 changed files with 1857 additions and 950 deletions

4
go.mod
View File

@@ -55,7 +55,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.1
github.com/nats-io/nats-server/v2 v2.11.2
github.com/nats-io/nats.go v1.41.2
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
@@ -261,7 +261,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/jwt/v2 v2.7.3 // indirect
github.com/nats-io/jwt/v2 v2.7.4 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect

8
go.sum
View File

@@ -820,10 +820,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.11.1 h1:LwdauqMqMNhTxTN3+WFTX6wGDOKntHljgZ+7gL5HCnk=
github.com/nats-io/nats-server/v2 v2.11.1/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.2 h1:k5KBAuRpJW9qAF11Io2txNhR5m1KUmqVkalLAw2yLfk=
github.com/nats-io/nats-server/v2 v2.11.2/go.mod h1:6Z6Fd+JgckqzKig7DYwhgrE7bJ6fypPHnGPND+DqgMY=
github.com/nats-io/nats.go v1.41.2 h1:5UkfLAtu/036s99AhFRlyNDI1Ieylb36qbGjJzHixos=
github.com/nats-io/nats.go v1.41.2/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=

View File

@@ -83,6 +83,7 @@ type Account struct {
exports exportMap
js *jsAccount
jsLimits map[string]JetStreamAccountLimits
nrgAccount string
limits
expired atomic.Bool
incomplete bool
@@ -249,7 +250,7 @@ type exportMap struct {
// For services we will also track the response mappings as well.
type importMap struct {
streams []*streamImport
services map[string]*serviceImport
services map[string][]*serviceImport
rrMap map[string][]*serviceRespEntry
}
@@ -299,10 +300,14 @@ func (a *Account) shallowCopy(na *Account) {
}
}
if a.imports.services != nil {
na.imports.services = make(map[string]*serviceImport)
na.imports.services = make(map[string][]*serviceImport)
for k, v := range a.imports.services {
si := *v
na.imports.services[k] = &si
sis := make([]*serviceImport, 0, len(v))
for _, si := range v {
csi := *si
sis = append(sis, &csi)
}
na.imports.services[k] = sis
}
}
if a.exports.streams != nil {
@@ -1189,9 +1194,11 @@ func (a *Account) TrackServiceExportWithSampling(service, results string, sampli
s.accounts.Range(func(k, v any) bool {
acc := v.(*Account)
acc.mu.Lock()
for _, im := range acc.imports.services {
if im != nil && im.acc.Name == a.Name && subjectIsSubsetMatch(im.to, service) {
im.latency = ea.latency
for _, ims := range acc.imports.services {
for _, im := range ims {
if im != nil && im.acc.Name == a.Name && subjectIsSubsetMatch(im.to, service) {
im.latency = ea.latency
}
}
}
acc.mu.Unlock()
@@ -1230,10 +1237,12 @@ func (a *Account) UnTrackServiceExport(service string) {
s.accounts.Range(func(k, v any) bool {
acc := v.(*Account)
acc.mu.Lock()
for _, im := range acc.imports.services {
if im != nil && im.acc.Name == a.Name {
if subjectIsSubsetMatch(im.to, service) {
im.latency, im.m1 = nil, nil
for _, ims := range acc.imports.services {
for _, im := range ims {
if im != nil && im.acc.Name == a.Name {
if subjectIsSubsetMatch(im.to, service) {
im.latency, im.m1 = nil, nil
}
}
}
}
@@ -1566,21 +1575,23 @@ func (a *Account) checkServiceImportsForCycles(from string, visited map[string]b
return ErrCycleSearchDepth
}
a.mu.RLock()
for _, si := range a.imports.services {
if SubjectsCollide(from, si.to) {
a.mu.RUnlock()
if visited[si.acc.Name] {
return ErrImportFormsCycle
for _, sis := range a.imports.services {
for _, si := range sis {
if SubjectsCollide(from, si.to) {
a.mu.RUnlock()
if visited[si.acc.Name] {
return ErrImportFormsCycle
}
// Push ourselves and check si.acc
visited[a.Name] = true
if subjectIsSubsetMatch(si.from, from) {
from = si.from
}
if err := si.acc.checkServiceImportsForCycles(from, visited); err != nil {
return err
}
a.mu.RLock()
}
// Push ourselves and check si.acc
visited[a.Name] = true
if subjectIsSubsetMatch(si.from, from) {
from = si.from
}
if err := si.acc.checkServiceImportsForCycles(from, visited); err != nil {
return err
}
a.mu.RLock()
}
}
a.mu.RUnlock()
@@ -1657,10 +1668,15 @@ func (a *Account) setServiceImportSharing(destination *Account, to string, check
if check && a.isClaimAccount() {
return fmt.Errorf("claim based accounts can not be updated directly")
}
for _, si := range a.imports.services {
if si.acc == destination && si.to == to {
si.share = allow
return nil
// We can't use getServiceImportForAccountLocked() here since we are looking
// for the service import with the si.to == to, which may not be the key
// for the service import in the map.
for _, sis := range a.imports.services {
for _, si := range sis {
if si.acc.Name == destination.Name && si.to == to {
si.share = allow
return nil
}
}
}
return fmt.Errorf("service import not found")
@@ -1750,19 +1766,60 @@ func (a *Account) removeRespServiceImport(si *serviceImport, reason rsiReason) {
dest.checkForReverseEntry(to, si, false)
}
// removeServiceImport will remove the route by subject.
func (a *Account) removeServiceImport(subject string) {
a.mu.Lock()
si, ok := a.imports.services[subject]
delete(a.imports.services, subject)
func (a *Account) getServiceImportForAccountLocked(dstAccName, subject string) *serviceImport {
sis, ok := a.imports.services[subject]
if !ok {
return nil
}
if len(sis) == 1 && sis[0].acc.Name == dstAccName {
return sis[0]
}
for _, si := range sis {
if si.acc.Name == dstAccName {
return si
}
}
return nil
}
// removeServiceImport will remove the route by subject.
func (a *Account) removeServiceImport(dstAccName, subject string) {
a.mu.Lock()
sis, ok := a.imports.services[subject]
if !ok {
a.mu.Unlock()
return
}
var si *serviceImport
if len(sis) == 1 {
si = sis[0]
if si.acc.Name != dstAccName {
si = nil
} else {
delete(a.imports.services, subject)
}
} else {
for i, esi := range sis {
if esi.acc.Name == dstAccName {
si = esi
last := len(sis) - 1
if i != last {
sis[i] = sis[last]
}
sis = sis[:last]
a.imports.services[subject] = sis
break
}
}
}
if si == nil {
a.mu.Unlock()
return
}
var sid []byte
c := a.ic
if ok && si != nil {
if a.ic != nil && si.sid != nil {
sid = si.sid
}
if c != nil && si.sid != nil {
sid = si.sid
}
a.mu.Unlock()
@@ -1945,9 +2002,9 @@ func (a *Account) serviceImportShadowed(from string) bool {
}
// Internal check to see if a service import exists.
func (a *Account) serviceImportExists(from string) bool {
func (a *Account) serviceImportExists(dstAccName, from string) bool {
a.mu.RLock()
dup := a.imports.services[from]
dup := a.getServiceImportForAccountLocked(dstAccName, from)
a.mu.RUnlock()
return dup != nil
}
@@ -1971,12 +2028,13 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
lat = se.latency
atrc = se.atrc
}
destAccName := dest.Name
dest.mu.RUnlock()
a.mu.Lock()
if a.imports.services == nil {
a.imports.services = make(map[string]*serviceImport)
} else if dup := a.imports.services[from]; dup != nil {
a.imports.services = make(map[string][]*serviceImport)
} else if dup := a.getServiceImportForAccountLocked(destAccName, from); dup != nil {
a.mu.Unlock()
return nil, fmt.Errorf("duplicate service import subject %q, previously used in import for account %q, subject %q",
from, dup.acc.Name, dup.to)
@@ -2015,11 +2073,13 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
share = claim.Share
}
si := &serviceImport{dest, claim, se, nil, from, to, tr, 0, rt, lat, nil, nil, usePub, false, false, share, false, false, atrc, nil}
a.imports.services[from] = si
sis := a.imports.services[from]
sis = append(sis, si)
a.imports.services[from] = sis
a.mu.Unlock()
if err := a.addServiceImportSub(si); err != nil {
a.removeServiceImport(si.from)
a.removeServiceImport(destAccName, si.from)
return nil, err
}
return si, nil
@@ -2104,10 +2164,12 @@ func (a *Account) addServiceImportSub(si *serviceImport) error {
func (a *Account) removeAllServiceImportSubs() {
a.mu.RLock()
var sids [][]byte
for _, si := range a.imports.services {
if si.sid != nil {
sids = append(sids, si.sid)
si.sid = nil
for _, sis := range a.imports.services {
for _, si := range sis {
if si.sid != nil {
sids = append(sids, si.sid)
si.sid = nil
}
}
}
c := a.ic
@@ -2128,8 +2190,8 @@ func (a *Account) addAllServiceImportSubs() {
var sis [32]*serviceImport
serviceImports := sis[:0]
a.mu.RLock()
for _, si := range a.imports.services {
serviceImports = append(serviceImports, si)
for _, sis := range a.imports.services {
serviceImports = append(serviceImports, sis...)
}
a.mu.RUnlock()
for _, si := range serviceImports {
@@ -2804,13 +2866,13 @@ func (a *Account) streamActivationExpired(exportAcc *Account, subject string) {
}
// These are import service specific versions for when an activation expires.
func (a *Account) serviceActivationExpired(subject string) {
func (a *Account) serviceActivationExpired(dstAcc *Account, subject string) {
a.mu.RLock()
if a.expired.Load() || a.imports.services == nil {
a.mu.RUnlock()
return
}
si := a.imports.services[subject]
si := a.getServiceImportForAccountLocked(dstAcc.Name, subject)
if si == nil || si.invalid {
a.mu.RUnlock()
return
@@ -2834,7 +2896,7 @@ func (a *Account) activationExpired(exportAcc *Account, subject string, kind jwt
case jwt.Stream:
a.streamActivationExpired(exportAcc, subject)
case jwt.Service:
a.serviceActivationExpired(subject)
a.serviceActivationExpired(exportAcc, subject)
}
}
@@ -3086,7 +3148,9 @@ func (a *Account) expiredTimeout() {
// Collect the clients and expire them.
cs := a.getClients()
for _, c := range cs {
c.accountAuthExpired()
if !isInternalClient(c.kind) {
c.accountAuthExpired()
}
}
}
@@ -3331,9 +3395,10 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.imports.streams = nil
}
if a.imports.services != nil {
old.imports.services = make(map[string]*serviceImport, len(a.imports.services))
old.imports.services = make(map[string][]*serviceImport, len(a.imports.services))
for k, v := range a.imports.services {
old.imports.services[k] = v
sis := append([]*serviceImport(nil), v...)
old.imports.services[k] = sis
delete(a.imports.services, k)
}
}
@@ -3536,15 +3601,16 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
clients := map[*client]struct{}{}
// We need to check all accounts that have an import claim from this account.
awcsti := map[string]struct{}{}
// We must only allow one goroutine to go through here, otherwise we could deadlock
// due to locking two accounts in succession.
s.mu.Lock()
s.accounts.Range(func(k, v any) bool {
acc := v.(*Account)
// Move to the next if this account is actually account "a".
if acc.Name == a.Name {
return true
}
// TODO: checkStreamImportAuthorized() stack should not be trying
// to lock "acc". If we find that to be needed, we will need to
// rework this to ensure we don't lock acc.
acc.mu.Lock()
for _, im := range acc.imports.streams {
if im != nil && im.acc.Name == a.Name {
@@ -3559,6 +3625,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
acc.mu.Unlock()
return true
})
s.mu.Unlock()
// Now walk clients.
for c := range clients {
c.processSubsOnConfigReload(awcsti)
@@ -3566,29 +3633,33 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
// Now check if service exports have changed.
if !a.checkServiceExportsEqual(old) || signersChanged || serviceTokenExpirationChanged {
// We must only allow one goroutine to go through here, otherwise we could deadlock
// due to locking two accounts in succession.
s.mu.Lock()
s.accounts.Range(func(k, v any) bool {
acc := v.(*Account)
// Move to the next if this account is actually account "a".
if acc.Name == a.Name {
return true
}
// TODO: checkServiceImportAuthorized() stack should not be trying
// to lock "acc". If we find that to be needed, we will need to
// rework this to ensure we don't lock acc.
acc.mu.Lock()
for _, si := range acc.imports.services {
if si != nil && si.acc.Name == a.Name {
// Check for if we are still authorized for an import.
si.invalid = !a.checkServiceImportAuthorized(acc, si.to, si.claim)
// Make sure we should still be tracking latency and if we
// are allowed to trace.
if !si.response {
if se := a.getServiceExport(si.to); se != nil {
if si.latency != nil {
si.latency = se.latency
for _, sis := range acc.imports.services {
for _, si := range sis {
if si != nil && si.acc.Name == a.Name {
// Check for if we are still authorized for an import.
si.invalid = !a.checkServiceImportAuthorized(acc, si.to, si.claim)
// Make sure we should still be tracking latency and if we
// are allowed to trace.
if !si.response {
a.mu.RLock()
if se := a.getServiceExport(si.to); se != nil {
if si.latency != nil {
si.latency = se.latency
}
// Update allow trace.
si.atrc = se.atrc
}
// Update allow trace.
si.atrc = se.atrc
a.mu.RUnlock()
}
}
}
@@ -3596,15 +3667,20 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
acc.mu.Unlock()
return true
})
s.mu.Unlock()
}
// Now make sure we shutdown the old service import subscriptions.
var sids [][]byte
a.mu.RLock()
c := a.ic
for _, si := range old.imports.services {
if c != nil && si.sid != nil {
sids = append(sids, si.sid)
if c != nil {
for _, sis := range old.imports.services {
for _, si := range sis {
if si.sid != nil {
sids = append(sids, si.sid)
}
}
}
}
a.mu.RUnlock()
@@ -3711,29 +3787,34 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
// Check whether the account NRG status changed. If it has then we need to notify the
// Raft groups running on the system so that they can move their subs if needed.
a.mu.Lock()
previous := ajs.nrgAccount
previous := a.nrgAccount
switch ac.ClusterTraffic {
case "system", _EMPTY_:
ajs.nrgAccount = _EMPTY_
a.nrgAccount = _EMPTY_
case "owner":
ajs.nrgAccount = a.Name
a.nrgAccount = a.Name
default:
s.Errorf("Account claim for %q has invalid value %q for cluster traffic account", a.Name, ac.ClusterTraffic)
}
changed := ajs.nrgAccount != previous
changed := a.nrgAccount != previous
a.mu.Unlock()
if changed {
s.updateNRGAccountStatus()
}
}
for i, c := range clients {
count := 0
for _, c := range clients {
a.mu.RLock()
exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns)
exceeded := a.mconns != jwt.NoLimit && count >= int(a.mconns)
a.mu.RUnlock()
if exceeded {
c.maxAccountConnExceeded()
continue
// Only kick non-internal clients.
if !isInternalClient(c.kind) {
if exceeded {
c.maxAccountConnExceeded()
continue
}
count++
}
c.mu.Lock()
c.applyAccountLimits()

View File

@@ -754,6 +754,9 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
// Check if we have trustedKeys defined in the server. If so we require a user jwt.
if s.trustedKeys != nil {
if c.opts.JWT == _EMPTY_ && opts.DefaultSentinel != _EMPTY_ {
c.opts.JWT = opts.DefaultSentinel
}
if c.opts.JWT == _EMPTY_ {
s.mu.Unlock()
c.Debugf("Authentication requires a user JWT")

View File

@@ -56,6 +56,11 @@ const (
ACCOUNT
)
// Internal clients. kind should be SYSTEM, JETSTREAM or ACCOUNT
func isInternalClient(kind int) bool {
return kind == SYSTEM || kind == JETSTREAM || kind == ACCOUNT
}
// Extended type of a CLIENT connection. This is returned by c.clientType()
// and indicate what type of client connection we are dealing with.
// If invoked on a non CLIENT connection, NON_CLIENT type is returned.
@@ -1897,12 +1902,29 @@ func (c *client) flushSignal() {
// Traces a message.
// Will NOT check if tracing is enabled, does NOT need the client lock.
func (c *client) traceMsg(msg []byte) {
maxTrace := c.srv.getOpts().MaxTracedMsgLen
if maxTrace > 0 && (len(msg)-LEN_CR_LF) > maxTrace {
opts := c.srv.getOpts()
maxTrace := opts.MaxTracedMsgLen
headersOnly := opts.TraceHeaders
suffix := LEN_CR_LF
// If TraceHeaders is enabled, extract only the header portion of the msg.
// If a header is present, it ends with an additional trailing CRLF.
if headersOnly {
msg, _ = c.msgParts(msg)
suffix += LEN_CR_LF
}
// Do not emit a log line for zero-length payloads.
l := len(msg) - suffix
if l <= 0 {
return
}
if maxTrace > 0 && l > maxTrace {
tm := fmt.Sprintf("%q", msg[:maxTrace])
c.Tracef("<<- MSG_PAYLOAD: [\"%s...\"]", tm[1:maxTrace+1])
c.Tracef("<<- MSG_PAYLOAD: [\"%s...\"]", tm[1:len(tm)-1])
} else {
c.Tracef("<<- MSG_PAYLOAD: [%q]", msg[:len(msg)-LEN_CR_LF])
c.Tracef("<<- MSG_PAYLOAD: [%q]", msg[:l])
}
}
@@ -1969,20 +1991,26 @@ func (c *client) processErr(errStr string) {
// Password pattern matcher.
var passPat = regexp.MustCompile(`"?\s*pass\S*?"?\s*[:=]\s*"?(([^",\r\n}])*)`)
var tokenPat = regexp.MustCompile(`"?\s*auth_token\S*?"?\s*[:=]\s*"?(([^",\r\n}])*)`)
// removePassFromTrace removes any notion of passwords from trace
// removeSecretsFromTrace removes any notion of passwords/tokens from trace
// messages for logging.
func removePassFromTrace(arg []byte) []byte {
if !bytes.Contains(arg, []byte(`pass`)) {
return arg
func removeSecretsFromTrace(arg []byte) []byte {
buf := redact("pass", passPat, arg)
return redact("auth_token", tokenPat, buf)
}
func redact(name string, pat *regexp.Regexp, proto []byte) []byte {
if !bytes.Contains(proto, []byte(name)) {
return proto
}
// Take a copy of the connect proto just for the trace message.
var _arg [4096]byte
buf := append(_arg[:0], arg...)
buf := append(_arg[:0], proto...)
m := passPat.FindAllSubmatchIndex(buf, -1)
m := pat.FindAllSubmatchIndex(buf, -1)
if len(m) == 0 {
return arg
return proto
}
redactedPass := []byte("[REDACTED]")
@@ -1993,7 +2021,7 @@ func removePassFromTrace(arg []byte) []byte {
start := i[2]
end := i[3]
// Replace password substring.
// Replace value substring.
buf = append(buf[:start], append(redactedPass, buf[end:]...)...)
break
}
@@ -2765,7 +2793,7 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw
// This check does not apply to SYSTEM or JETSTREAM or ACCOUNT clients (because they don't have a `nc`...)
// When a connection is closed though, we set c.subs to nil. So check for the map to not be nil.
if (c.isClosed() && (kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT)) || (c.subs == nil) {
if (c.isClosed() && !isInternalClient(kind)) || (c.subs == nil) {
c.mu.Unlock()
return nil, ErrConnectionClosed
}
@@ -4307,7 +4335,7 @@ func sliceHeader(key string, hdr []byte) []byte {
if len(hdr) == 0 {
return nil
}
index := bytes.Index(hdr, []byte(key))
index := bytes.Index(hdr, stringToBytes(key))
hdrLen := len(hdr)
// Check that we have enough characters, this will handle the -1 case of the key not
// being found and will also handle not having enough characters for trailing CRLF.
@@ -4663,11 +4691,21 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Check for JetStream encoded reply subjects.
// For now these will only be on $JS.ACK prefixed reply subjects.
var remapped bool
if len(creply) > 0 &&
c.kind != CLIENT && c.kind != SYSTEM && c.kind != JETSTREAM && c.kind != ACCOUNT &&
bytes.HasPrefix(creply, []byte(jsAckPre)) {
if len(creply) > 0 && c.kind != CLIENT && !isInternalClient(c.kind) && bytes.HasPrefix(creply, []byte(jsAckPre)) {
// We need to rewrite the subject and the reply.
if li := bytes.LastIndex(creply, []byte("@")); li != -1 && li < len(creply)-1 {
// But, we must be careful that the stream name, consumer name, and subject can contain '@' characters.
// JS ACK contains at least 8 dots, find the first @ after this prefix.
// - $JS.ACK.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<tm>.<pending>
counter := 0
li := bytes.IndexFunc(creply, func(rn rune) bool {
if rn == '.' {
counter++
} else if rn == '@' {
return counter >= 8
}
return false
})
if li != -1 && li < len(creply)-1 {
remapped = true
subj, creply = creply[li+1:], creply[:li]
}
@@ -6080,7 +6118,7 @@ func (c *client) getRawAuthUser() string {
case c.opts.JWT != _EMPTY_:
return c.pubKey
case c.opts.Token != _EMPTY_:
return c.opts.Token
return "[REDACTED]"
default:
return _EMPTY_
}
@@ -6097,7 +6135,7 @@ func (c *client) getAuthUser() string {
case c.opts.JWT != _EMPTY_:
return fmt.Sprintf("JWT User %q", c.pubKey)
case c.opts.Token != _EMPTY_:
return fmt.Sprintf("Token %q", c.opts.Token)
return fmt.Sprintf("Token %q", "[REDACTED]")
default:
return `User "N/A"`
}

View File

@@ -58,7 +58,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.11.1"
VERSION = "2.11.2"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -436,6 +436,7 @@ type consumer struct {
rdqi avl.SequenceSet
rdc map[uint64]uint64
replies map[uint64]string
pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum.
maxdc uint64
waiting *waitQueue
cfg ConsumerConfig
@@ -922,8 +923,12 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if cName != _EMPTY_ {
if eo, ok := mset.consumers[cName]; ok {
mset.mu.Unlock()
if action == ActionCreate && !reflect.DeepEqual(*config, eo.config()) {
return nil, NewJSConsumerAlreadyExistsError()
if action == ActionCreate {
ocfg := eo.config()
copyConsumerMetadata(config, &ocfg)
if !reflect.DeepEqual(config, &ocfg) {
return nil, NewJSConsumerAlreadyExistsError()
}
}
// Check for overlapping subjects if we are a workqueue
if cfg.Retention == WorkQueuePolicy {
@@ -1105,7 +1110,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if o.store != nil && o.store.HasState() {
// Restore our saved state.
o.mu.Lock()
o.readStoredState(0)
o.readStoredState()
o.mu.Unlock()
} else {
// Select starting sequence number
@@ -1366,7 +1371,7 @@ func (o *consumer) setLeader(isLeader bool) {
}
mset.mu.RLock()
s, jsa, stream, lseq := mset.srv, mset.jsa, mset.getCfgName(), mset.lseq
s, jsa, stream := mset.srv, mset.jsa, mset.getCfgName()
mset.mu.RUnlock()
o.mu.Lock()
@@ -1377,7 +1382,7 @@ func (o *consumer) setLeader(isLeader bool) {
// During non-leader status we just update our underlying store when not clustered.
// If clustered we need to propose our initial (possibly skipped ahead) o.sseq to the group.
if o.node == nil || o.dseq > 1 || (o.store != nil && o.store.HasState()) {
o.readStoredState(lseq)
o.readStoredState()
} else if o.node != nil && o.sseq >= 1 {
o.updateSkipped(o.sseq)
}
@@ -1529,6 +1534,7 @@ func (o *consumer) setLeader(isLeader bool) {
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
o.resetPendingDeliveries()
// ok if they are nil, we protect inside unsubscribe()
o.unsubscribe(o.ackSub)
o.unsubscribe(o.reqSub)
@@ -2166,6 +2172,11 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
if cfg.MaxAckPending != o.cfg.MaxAckPending {
o.maxp = cfg.MaxAckPending
o.signalNewMessages()
// If MaxAckPending is lowered, we could have allocated a pending deliveries map of larger size.
// Reset it here, so we can shrink the map.
if cfg.MaxAckPending < o.cfg.MaxAckPending {
o.resetPendingDeliveries()
}
}
// AckWait
if cfg.AckWait != o.cfg.AckWait {
@@ -2525,6 +2536,16 @@ func (o *consumer) addAckReply(sseq uint64, reply string) {
o.replies[sseq] = reply
}
// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum.
// Lock should be held.
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) {
// Is not explicitly limited in size, but will at maximum hold maximum ack pending.
if o.pendingDeliveries == nil {
o.pendingDeliveries = make(map[uint64]*jsPubMsg)
}
o.pendingDeliveries[pmsg.seq] = pmsg
}
// Lock should be held.
func (o *consumer) updateAcks(dseq, sseq uint64, reply string) {
if o.node != nil {
@@ -2773,14 +2794,10 @@ func (o *consumer) ackWait(next time.Duration) time.Duration {
// Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check.
// Lock should be held.
func (o *consumer) checkRedelivered(slseq uint64) {
var lseq uint64
if mset := o.mset; mset != nil {
lseq = slseq
}
func (o *consumer) checkRedelivered() {
var shouldUpdateState bool
for sseq := range o.rdc {
if sseq <= o.asflr || (lseq > 0 && sseq > lseq) {
if sseq <= o.asflr {
delete(o.rdc, sseq)
o.removeFromRedeliverQueue(sseq)
shouldUpdateState = true
@@ -2796,7 +2813,7 @@ func (o *consumer) checkRedelivered(slseq uint64) {
// This will restore the state from disk.
// Lock should be held.
func (o *consumer) readStoredState(slseq uint64) error {
func (o *consumer) readStoredState() error {
if o.store == nil {
return nil
}
@@ -2804,7 +2821,7 @@ func (o *consumer) readStoredState(slseq uint64) error {
if err == nil {
o.applyState(state)
if len(o.rdc) > 0 {
o.checkRedelivered(slseq)
o.checkRedelivered()
}
}
return err
@@ -3089,7 +3106,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
}
// Check if this ack is above the current pointer to our next to deliver.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq >= o.sseq {
// Let's make sure this is valid.
// This is only received on the consumer leader, so should never be higher
@@ -3159,7 +3175,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
needSignal = true
}
sgap = sseq - o.asflr
floor = sgap // start at same and set lower as we go.
floor = sseq // start at same and set lower as we go.
o.adflr, o.asflr = dseq, sseq
remove := func(seq uint64) {
@@ -3596,7 +3612,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
priorityGroup = o.cfg.PriorityGroups[0]
}
lastRequest := o.waiting.tail
numCycled := 0
for wr := o.waiting.peek(); !o.waiting.isEmpty(); wr = o.waiting.peek() {
if wr == nil {
break
@@ -3650,7 +3666,8 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
// If we have a match, we do nothing here and will deliver the message later down the code path.
} else if wr.priorityGroup.Id == _EMPTY_ {
o.waiting.cycle()
if wr == lastRequest {
numCycled++
if numCycled >= o.waiting.len() {
return nil
}
continue
@@ -3672,8 +3689,9 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
(wr.priorityGroup.MinPending > 0 && wr.priorityGroup.MinPending > o.npc+1 ||
wr.priorityGroup.MinAckPending > 0 && wr.priorityGroup.MinAckPending > int64(len(o.pending))) {
o.waiting.cycle()
numCycled++
// We're done cycling through the requests.
if wr == lastRequest {
if numCycled >= o.waiting.len() {
return nil
}
continue
@@ -4840,7 +4858,14 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
}
// Send message.
o.outq.send(pmsg)
// If we're replicated we MUST only send the message AFTER we've got quorum for updating
// delivered state. Otherwise, we could be in an invalid state after a leader change.
// We can send immediately if not replicated, not using acks, or using flow control (incompatible).
if o.node == nil || ap == AckNone || o.cfg.FlowControl {
o.outq.send(pmsg)
} else {
o.addReplicatedQueuedMsg(pmsg)
}
// Flow control.
if o.maxpb > 0 && o.needFlowControl(psz) {
@@ -5017,9 +5042,9 @@ func (o *consumer) didNotDeliver(seq uint64, subj string) {
}
o.mu.Unlock()
// If we do not have interest update that here.
if checkDeliveryInterest && o.hasNoLocalInterest() {
o.updateDeliveryInterest(false)
if checkDeliveryInterest {
localInterest := !o.hasNoLocalInterest()
o.updateDeliveryInterest(localInterest)
}
}
@@ -5320,9 +5345,6 @@ func (o *consumer) selectStartingSeqNo() {
if mmp == 1 {
o.sseq = state.FirstSeq
} else {
// A threshold for when we switch from get last msg to subjects state.
const numSubjectsThresh = 256
lss := &lastSeqSkipList{resume: state.LastSeq}
var filters []string
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
@@ -5331,24 +5353,10 @@ func (o *consumer) selectStartingSeqNo() {
filters = append(filters, filter.subject)
}
}
for _, filter := range filters {
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
var smv StoreMsg
for subj := range st {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
for _, ss := range mss {
lss.seqs = append(lss.seqs, ss.Last)
}
}
}
// Sort the skip list if needed.
if len(lss.seqs) > 1 {
slices.Sort(lss.seqs)
}
lss := &lastSeqSkipList{resume: state.LastSeq}
lss.seqs, _ = o.mset.store.MultiLastSeqs(filters, 0, 0)
if len(lss.seqs) == 0 {
o.sseq = state.LastSeq
} else {
@@ -5765,7 +5773,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool) {
o.mu.Lock()
if !o.isLeader() {
o.readStoredState(0)
o.readStoredState()
}
start := o.asflr
o.mu.Unlock()
@@ -5872,7 +5880,10 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
// Check if this message was pending.
p, wasPending := o.pending[sseq]
rdc := o.deliveryCount(sseq)
var rdc uint64
if wasPending {
rdc = o.deliveryCount(sseq)
}
o.mu.Unlock()
@@ -6116,3 +6127,10 @@ func (o *consumer) stopAndClearPtmr() {
stopAndClearTimer(&o.ptmr)
o.ptmrEnd = time.Time{}
}
func (o *consumer) resetPendingDeliveries() {
for _, pmsg := range o.pendingDeliveries {
pmsg.returnToPool()
}
o.pendingDeliveries = nil
}

View File

@@ -21,9 +21,11 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"net/http"
"runtime"
"runtime/debug"
"strconv"
"strings"
"sync"
@@ -376,6 +378,8 @@ type ServerStats struct {
Gateways []*GatewayStat `json:"gateways,omitempty"`
ActiveServers int `json:"active_servers,omitempty"`
JetStream *JetStreamVarz `json:"jetstream,omitempty"`
MemLimit int64 `json:"gomemlimit,omitempty"`
MaxProcs int `json:"gomaxprocs,omitempty"`
}
// RouteStat holds route statistics.
@@ -821,6 +825,10 @@ func (s *Server) updateServerUsage(v *ServerStats) {
var vss int64
pse.ProcUsage(&v.CPU, &v.Mem, &vss)
v.Cores = runtime.NumCPU()
v.MaxProcs = runtime.GOMAXPROCS(-1)
if mm := debug.SetMemoryLimit(-1); mm < math.MaxInt64 {
v.MemLimit = mm
}
}
// Generate a route stat for our statz update.
@@ -2284,10 +2292,11 @@ func (s *Server) registerSystemImports(a *Account) {
if sacc == nil || sacc == a {
return
}
dstAccName := sacc.Name
// FIXME(dlc) - make a shared list between sys exports etc.
importSrvc := func(subj, mappedSubj string) {
if !a.serviceImportExists(subj) {
if !a.serviceImportExists(dstAccName, subj) {
if err := a.addServiceImportWithClaim(sacc, subj, mappedSubj, nil, true); err != nil {
s.Errorf("Error setting up system service import %s -> %s for account: %v",
subj, mappedSubj, err)
@@ -2824,8 +2833,6 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, _ *Account, s
si.rc = nil
acc.mu.Unlock()
// Make sure we remove the entry here.
acc.removeServiceImport(si.from)
// Send the metrics
s.sendInternalAccountMsg(acc, lsub, m1)
}

View File

File diff suppressed because it is too large Load Diff

View File

@@ -176,9 +176,6 @@ type jsAccount struct {
updatesSub *subscription
lupdate time.Time
utimer *time.Timer
// Which account to send NRG traffic into. Empty string is system account.
nrgAccount string
}
// Track general usage for this account.
@@ -715,7 +712,12 @@ func (a *Account) enableAllJetStreamServiceImportsAndMappings() error {
return fmt.Errorf("jetstream account not registered")
}
if !a.serviceImportExists(jsAllAPI) {
var dstAccName string
if sacc := s.SystemAccount(); sacc != nil {
dstAccName = sacc.Name
}
if !a.serviceImportExists(dstAccName, jsAllAPI) {
// Capture si so we can turn on implicit sharing with JetStream layer.
// Make sure to set "to" otherwise will incur performance slow down.
si, err := a.addServiceImport(s.SystemAccount(), jsAllAPI, jsAllAPI, nil)

View File

@@ -1997,19 +1997,22 @@ func (rg *raftGroup) setPreferred() {
}
// createRaftGroup is called to spin up this raft group if needed.
func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType, labels pprofLabels) error {
func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType, labels pprofLabels) (RaftNode, error) {
// Must hold JS lock throughout, otherwise two parallel calls for the same raft group could result
// in duplicate instances for the same identifier, if the current Raft node is shutting down.
// We can release the lock temporarily while waiting for the Raft node to shut down.
js.mu.Lock()
defer js.mu.Unlock()
s, cc := js.srv, js.cluster
if cc == nil || cc.meta == nil {
js.mu.Unlock()
return NewJSClusterNotActiveError()
return nil, NewJSClusterNotActiveError()
}
// If this is a single peer raft group or we are not a member return.
if len(rg.Peers) <= 1 || !rg.isMember(cc.meta.ID()) {
js.mu.Unlock()
// Nothing to do here.
return nil
return nil, nil
}
// Check if we already have this assigned.
@@ -2037,20 +2040,35 @@ retry:
samePeers = slices.Equal(groupPeerIDs, nodePeerIDs)
}
if !samePeers {
// At this point we have no way of knowing:
// 1. Whether the group has lost enough nodes to cause a quorum
// loss, in which case a proposal may fail, therefore we will
// force a peerstate write;
// 2. Whether nodes in the group have other applies queued up
// that could change the peerstate again, therefore the leader
// should send out a new proposal anyway too just to make sure
// that this change gets captured in the log.
node.UpdateKnownPeers(groupPeerIDs)
// If the peers changed as a result of an update by the meta layer, we must reflect that in the log of
// this group. Otherwise, a new peer would come up and instantly reset the peer state back to whatever is
// in the log at that time, overwriting what the meta layer told it.
// Will need to address this properly later on, by for example having the meta layer decide the new
// placement, but have the leader of this group propose it through its own log instead.
if node.Leader() {
node.ProposeKnownPeers(groupPeerIDs)
}
}
rg.node = node
js.mu.Unlock()
return nil
return node, nil
}
s.Debugf("JetStream cluster creating raft group:%+v", rg)
js.mu.Unlock()
sysAcc := s.SystemAccount()
if sysAcc == nil {
s.Debugf("JetStream cluster detected shutdown processing raft group: %+v", rg)
return errors.New("shutting down")
return nil, errors.New("shutting down")
}
// Check here to see if we have a max HA Assets limit set.
@@ -2059,7 +2077,7 @@ retry:
s.Warnf("Maximum HA Assets limit reached: %d", maxHaAssets)
// Since the meta leader assigned this, send a statsz update to them to get them up to date.
go s.sendStatszUpdate()
return errors.New("system limit reached")
return nil, errors.New("system limit reached")
}
}
@@ -2080,14 +2098,14 @@ retry:
)
if err != nil {
s.Errorf("Error creating filestore WAL: %v", err)
return err
return nil, err
}
store = fs
} else {
ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage})
if err != nil {
s.Errorf("Error creating memstore WAL: %v", err)
return err
return nil, err
}
store = ms
}
@@ -2101,17 +2119,15 @@ retry:
n, err := s.startRaftNode(accName, cfg, labels)
if err != nil || n == nil {
s.Debugf("Error creating raft group: %v", err)
return err
return nil, err
}
// Need locking here for the assignment to avoid data-race reports
js.mu.Lock()
// Need JS lock to be held for the assignment to avoid data-race reports
rg.node = n
// See if we are preferred and should start campaign immediately.
if n.ID() == rg.Preferred && n.Term() == 0 {
n.Campaign()
n.CampaignImmediately()
}
js.mu.Unlock()
return nil
return n, nil
}
func (mset *stream) raftGroup() *raftGroup {
@@ -3735,7 +3751,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
js.mu.RUnlock()
// Process the raft group and make sure it's running if needed.
err := js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{
_, err := js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{
"type": "stream",
"account": acc.Name,
"stream": sa.Config.Name,
@@ -4335,28 +4351,18 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Check if we already have this consumer running.
o := mset.lookupConsumer(consumer)
if !alreadyRunning {
// Process the raft group and make sure its running if needed.
storage := mset.config().Storage
if ca.Config.MemoryStorage {
storage = MemoryStorage
}
// No-op if R1.
js.createRaftGroup(accName, rg, storage, pprofLabels{
"type": "consumer",
"account": mset.accName(),
"stream": ca.Stream,
"consumer": ca.Name,
})
} else {
// If we are clustered update the known peers.
js.mu.RLock()
node := rg.node
js.mu.RUnlock()
if node != nil {
node.UpdateKnownPeers(ca.Group.Peers)
}
// Process the raft group and make sure it's running if needed.
storage := mset.config().Storage
if ca.Config.MemoryStorage {
storage = MemoryStorage
}
// No-op if R1.
js.createRaftGroup(accName, rg, storage, pprofLabels{
"type": "consumer",
"account": mset.accName(),
"stream": ca.Stream,
"consumer": ca.Name,
})
// Check if we already have this consumer running.
var didCreate, isConfigUpdate, needsLocalResponse bool
@@ -4726,12 +4732,12 @@ func (o *consumer) raftNode() RaftNode {
}
func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
s, n, cc := js.server(), o.raftNode(), js.cluster
s, n, meta := js.server(), o.raftNode(), js.getMetaGroup()
defer s.grWG.Done()
defer o.clearMonitorRunning()
if n == nil {
if n == nil || meta == nil {
s.Warnf("No RAFT group for '%s > %s > %s'", o.acc.Name, ca.Stream, ca.Name)
return
}
@@ -4741,7 +4747,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
// from underneath the one that is running since it will be the same raft node.
defer n.Stop()
qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID()
qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), meta.ID()
s.Debugf("Starting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())
defer s.Debugf("Exiting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())
@@ -4936,7 +4942,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
cca := ca.copyGroup()
cca.Group.Peers = newPeers
cca.Group.Cluster = s.cachedClusterName()
cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca))
meta.ForwardProposal(encodeAddConsumerAssignment(cca))
s.Noticef("Scaling down '%s > %s > %s' to %+v", ca.Client.serviceAccount(), ca.Stream, ca.Name, s.peerSetToNames(newPeers))
} else {
@@ -5037,6 +5043,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
o.mu.Lock()
err = o.store.UpdateDelivered(dseq, sseq, dc, ts)
o.ldt = time.Now()
// Need to send message to the client, since we have quorum to do so now.
if pmsg, ok := o.pendingDeliveries[sseq]; ok {
o.outq.send(pmsg)
delete(o.pendingDeliveries, sseq)
}
o.mu.Unlock()
if err != nil {
panic(err.Error())
@@ -5583,6 +5594,13 @@ func (js *jetStream) processLeaderChange(isLeader bool) {
js.mu.Lock()
defer js.mu.Unlock()
if isLeader {
if meta := js.cluster.meta; meta != nil && meta.IsObserver() {
meta.StepDown()
return
}
}
if isLeader {
js.startUpdatesSub()
} else {
@@ -7956,6 +7974,26 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
return errStreamMismatch
}
// TTL'd messages are rejected entirely if TTLs are not enabled on the stream, or if the TTL is invalid.
if ttl, err := getMessageTTL(hdr); !sourced && (ttl != 0 || err != nil) {
if !allowTTL {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSMessageTTLDisabledError()
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return errMsgTTLDisabled
} else if err != nil {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSMessageTTLInvalidError()
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return err
}
}
// Check for MsgIds here at the cluster level to avoid excessive CLFS accounting.
// Will help during restarts.
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
@@ -7985,17 +8023,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.storeMsgIdLocked(&ddentry{msgId, 0, time.Now().UnixNano()})
mset.mu.Unlock()
}
// TTL'd messages are rejected entirely if TTLs are not enabled on the stream.
if ttl, _ := getMessageTTL(hdr); !sourced && ttl != 0 && !allowTTL {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSMessageTTLDisabledError()
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return errMsgTTLDisabled
}
}
// Proceed with proposing this message.

View File

@@ -69,6 +69,10 @@ func wipeSlice(buf []byte) {
// will expand the trusted keys in options.
func validateTrustedOperators(o *Options) error {
if len(o.TrustedOperators) == 0 {
// if we have no operator, default sentinel shouldn't be set
if o.DefaultSentinel != "" {
return fmt.Errorf("default sentinel requires operators and accounts")
}
return nil
}
if o.AccountResolver == nil {

View File

@@ -38,12 +38,13 @@ type memStore struct {
dmap avl.SequenceSet
maxp int64
scb StorageUpdateHandler
rmcb StorageRemoveMsgHandler
sdmcb SubjectDeleteMarkerUpdateHandler
ageChk *time.Timer
consumers int
receivedAny bool
ttls *thw.HashWheel
markers []string
sdm *SDMMeta
}
func newMemStore(cfg *StreamConfig) (*memStore, error) {
@@ -64,7 +65,7 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
ms.ttls = thw.NewHashWheel()
}
if cfg.FirstSeq > 0 {
if _, err := ms.purge(cfg.FirstSeq, true); err != nil {
if _, err := ms.purge(cfg.FirstSeq); err != nil {
return nil, err
}
}
@@ -332,6 +333,14 @@ func (ms *memStore) RegisterStorageUpdates(cb StorageUpdateHandler) {
ms.mu.Unlock()
}
// RegisterStorageRemoveMsg registers a callback to remove messages.
// Replicated streams should propose removals, R1 can remove inline.
func (ms *memStore) RegisterStorageRemoveMsg(cb StorageRemoveMsgHandler) {
ms.mu.Lock()
ms.rmcb = cb
ms.mu.Unlock()
}
// RegisterSubjectDeleteMarkerUpdates registers a callback for updates to new subject delete markers.
func (ms *memStore) RegisterSubjectDeleteMarkerUpdates(cb SubjectDeleteMarkerUpdateHandler) {
ms.mu.Lock()
@@ -628,6 +637,44 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
return fss
}
// AllLastSeqs will return a sorted list of last sequences for all subjects.
func (ms *memStore) AllLastSeqs() ([]uint64, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if len(ms.msgs) == 0 {
return nil, nil
}
seqs := make([]uint64, 0, ms.fss.Size())
ms.fss.IterFast(func(subj []byte, ss *SimpleState) bool {
seqs = append(seqs, ss.Last)
return true
})
slices.Sort(seqs)
return seqs, nil
}
// Helper to determine if the filter(s) represent all the subjects.
// Most clients send in subjects even if they match the stream's ingest subjects.
// Lock should be held.
func (ms *memStore) filterIsAll(filters []string) bool {
if len(filters) != len(ms.cfg.Subjects) {
return false
}
// Sort so we can compare.
slices.Sort(filters)
for i, subj := range filters {
if !subjectIsSubsetMatch(ms.cfg.Subjects[i], subj) {
return false
}
}
return true
}
// MultiLastSeqs will return a sorted list of sequences that match all subjects presented in filters.
// We will not exceed the maxSeq, which if 0 becomes the store's last sequence.
func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
@@ -636,12 +683,16 @@ func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed in
return nil, nil
}
// See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set.
if maxSeq == 0 && maxAllowed <= 0 && ms.filterIsAll(filters) {
return ms.AllLastSeqs()
}
// Implied last sequence.
if maxSeq == 0 {
maxSeq = ms.state.LastSeq
}
//subs := make(map[string]*SimpleState)
seqs := make([]uint64, 0, 64)
seen := make(map[uint64]struct{})
@@ -680,7 +731,11 @@ func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed in
func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.subjectsTotalsLocked(filterSubject)
}
// Lock should be held.
func (ms *memStore) subjectsTotalsLocked(filterSubject string) map[string]uint64 {
if ms.fss.Size() == 0 {
return nil
}
@@ -872,7 +927,7 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
if !ms.removeMsg(ss.First, false, _EMPTY_) {
if !ms.removeMsg(ss.First, false) {
break
}
}
@@ -934,6 +989,15 @@ func (ms *memStore) resetAgeChk(delta int64) {
// Check to see if we should be firing sooner than MaxAge for an expiring TTL.
fireIn := ms.cfg.MaxAge
// If delta for next-to-expire message is unset, but we still have messages to remove.
// Assume messages are removed through proposals, and we need to speed up subsequent age check.
if delta == 0 && ms.state.Msgs > 0 {
if until := 2 * time.Second; until < fireIn {
fireIn = until
}
}
if next < math.MaxInt64 {
// Looks like there's a next expiration, use it either if there's no
// MaxAge set or if it looks to be sooner than MaxAge is.
@@ -970,69 +1034,6 @@ func (ms *memStore) cancelAgeChk() {
}
}
// Lock must be held so that nothing else can interleave and write a
// new message on this subject before we get the chance to write the
// delete marker. If the delete marker is written successfully then
// this function returns a callback func to call scb and sdmcb after
// the lock has been released.
func (ms *memStore) subjectDeleteMarkerIfNeeded(subj string, reason string) func() {
if ms.cfg.SubjectDeleteMarkerTTL <= 0 {
return nil
}
if _, ok := ms.fss.Find(stringToBytes(subj)); ok {
// There are still messages left with this subject,
// therefore it wasn't the last message deleted.
return nil
}
// Build the subject delete marker. If no TTL is specified then
// we'll default to 15 minutes — by that time every possible condition
// should have cleared (i.e. ordered consumer timeout, client timeouts,
// route/gateway interruptions, even device/client restarts etc).
ttl := int64(ms.cfg.SubjectDeleteMarkerTTL.Seconds())
if ttl <= 0 {
return nil
}
var _hdr [128]byte
hdr := fmt.Appendf(
_hdr[:0],
"NATS/1.0\r\n%s: %s\r\n%s: %s\r\n%s: %d\r\n%s: %s\r\n\r\n\r\n",
JSMarkerReason, reason,
JSMessageTTL, time.Duration(ttl)*time.Second,
JSExpectedLastSubjSeq, 0,
JSExpectedLastSubjSeqSubj, subj,
)
msg := &inMsg{
subj: subj,
hdr: hdr,
}
sdmcb := ms.sdmcb
return func() {
if sdmcb != nil {
sdmcb(msg)
}
}
}
// Memstore lock must be held. The caller should call the callback, if non-nil,
// after releasing the memstore lock.
func (ms *memStore) subjectDeleteMarkersAfterOperation(reason string) func() {
if ms.cfg.SubjectDeleteMarkerTTL <= 0 || len(ms.markers) == 0 {
return nil
}
cbs := make([]func(), 0, len(ms.markers))
for _, subject := range ms.markers {
if cb := ms.subjectDeleteMarkerIfNeeded(subject, reason); cb != nil {
cbs = append(cbs, cb)
}
}
ms.markers = nil
return func() {
for _, cb := range cbs {
cb()
}
}
}
// Will expire msgs that are too old.
func (ms *memStore) expireMsgs() {
var smv StoreMsg
@@ -1040,7 +1041,14 @@ func (ms *memStore) expireMsgs() {
ms.mu.RLock()
maxAge := int64(ms.cfg.MaxAge)
minAge := time.Now().UnixNano() - maxAge
rmcb := ms.rmcb
sdmcb := ms.sdmcb
sdmTTL := int64(ms.cfg.SubjectDeleteMarkerTTL.Seconds())
sdmEnabled := sdmTTL > 0
ms.mu.RUnlock()
if sdmEnabled && (rmcb == nil || sdmcb == nil) {
return
}
if maxAge > 0 {
var seq uint64
@@ -1052,9 +1060,16 @@ func (ms *memStore) expireMsgs() {
continue
}
}
ms.mu.Lock()
ms.removeMsg(seq, false, JSMarkerReasonMaxAge)
ms.mu.Unlock()
if sdmEnabled {
if last, ok := ms.shouldProcessSdm(seq, sm.subj); ok {
sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0
ms.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL)
}
} else {
ms.mu.Lock()
ms.removeMsg(seq, false)
ms.mu.Unlock()
}
// Recalculate in case we are expiring a bunch.
minAge = time.Now().UnixNano() - maxAge
}
@@ -1065,9 +1080,27 @@ func (ms *memStore) expireMsgs() {
// TODO: Not great that we're holding the lock here, but the timed hash wheel isn't thread-safe.
nextTTL := int64(math.MaxInt64)
var rmSeqs []uint64
var ttlSdm map[string][]SDMBySubj
if ms.ttls != nil {
ms.ttls.ExpireTasks(func(seq uint64, ts int64) {
ms.removeMsg(seq, false, _EMPTY_)
ms.ttls.ExpireTasks(func(seq uint64, ts int64) bool {
if sdmEnabled {
// Need to grab subject for the specified sequence, and check
// if the message hasn't been removed in the meantime.
sm, _ = ms.loadMsgLocked(seq, &smv, false)
if sm != nil {
if ttlSdm == nil {
ttlSdm = make(map[string][]SDMBySubj, 1)
}
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0})
return false
}
} else {
// Collect sequences to remove. Don't remove messages inline here,
// as that releases the lock and THW is not thread-safe.
rmSeqs = append(rmSeqs, seq)
}
return true
})
if maxAge > 0 {
// Only check if we're expiring something in the next MaxAge interval, saves us a bit
@@ -1078,6 +1111,34 @@ func (ms *memStore) expireMsgs() {
}
}
// Remove messages collected by THW.
for _, seq := range rmSeqs {
ms.removeMsg(seq, false)
}
// THW is unordered, so must sort by sequence and must not be holding the lock.
if len(ttlSdm) > 0 {
ms.mu.Unlock()
for subj, es := range ttlSdm {
slices.SortFunc(es, func(a, b SDMBySubj) int {
if a.seq == b.seq {
return 0
} else if a.seq < b.seq {
return -1
} else {
return 1
}
})
for _, e := range es {
if last, ok := ms.shouldProcessSdm(e.seq, subj); ok {
sdm := last && !e.sdm
ms.handleRemovalOrSdm(e.seq, subj, sdm, sdmTTL)
}
}
}
ms.mu.Lock()
}
// Only cancel if no message left, not on potential lookup error that would result in sm == nil.
if ms.state.Msgs == 0 && nextTTL == math.MaxInt64 {
ms.cancelAgeChk()
@@ -1090,19 +1151,72 @@ func (ms *memStore) expireMsgs() {
}
}
func (ms *memStore) shouldProcessSdm(seq uint64, subj string) (bool, bool) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.sdm == nil {
ms.sdm = newSDMMeta()
}
if p, ok := ms.sdm.pending[seq]; ok {
// If we're about to use the cached value, and we knew it was last before,
// quickly check that we don't have more remaining messages for the subject now.
// Which means we are not the last anymore and must reset to not remove later data.
if p.last {
msgs := ms.subjectsTotalsLocked(subj)[subj]
numPending := ms.sdm.totals[subj]
if remaining := msgs - numPending; remaining > 0 {
p.last = false
}
}
// Don't allow more proposals for the same sequence if we already did recently.
if time.Since(time.Unix(0, p.ts)) < 2*time.Second {
return p.last, false
}
ms.sdm.pending[seq] = SDMBySeq{p.last, time.Now().UnixNano()}
return p.last, true
}
msgs := ms.subjectsTotalsLocked(subj)[subj]
if msgs == 0 {
return false, true
}
numPending := ms.sdm.totals[subj]
remaining := msgs - numPending
return ms.sdm.trackPending(seq, subj, remaining == 1), true
}
func (ms *memStore) handleRemovalOrSdm(seq uint64, subj string, sdm bool, sdmTTL int64) {
if sdm {
var _hdr [128]byte
hdr := fmt.Appendf(
_hdr[:0],
"NATS/1.0\r\n%s: %s\r\n%s: %s\r\n%s: %s\r\n\r\n",
JSMarkerReason, JSMarkerReasonMaxAge,
JSMessageTTL, time.Duration(sdmTTL)*time.Second,
JSMsgRollup, JSMsgRollupSubject,
)
msg := &inMsg{
subj: subj,
hdr: hdr,
}
ms.sdmcb(msg)
} else {
ms.rmcb(seq)
}
}
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64, _ /* noMarkers */ bool) (purged uint64, err error) {
// TODO: Don't write markers on purge until we have solved performance
// issues with them.
noMarkers := true
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && sequence == 0 {
return ms.purge(0, noMarkers)
return ms.purge(0)
}
if sequence > 1 {
return ms.compact(sequence, noMarkers)
return ms.compact(sequence)
} else if keep > 0 {
ms.mu.RLock()
msgs, lseq := ms.state.Msgs, ms.state.LastSeq
@@ -1110,7 +1224,7 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64, _ /* noMarker
if keep >= msgs {
return 0, nil
}
return ms.compact(lseq-keep+1, noMarkers)
return ms.compact(lseq - keep + 1)
}
return 0, nil
@@ -1128,13 +1242,9 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64, _ /* noMarker
last = sequence - 1
}
ms.mu.Lock()
var removeReason string
if !noMarkers {
removeReason = JSMarkerReasonPurge
}
for seq := ss.First; seq <= last; seq++ {
if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subject) {
if ok := ms.removeMsg(sm.seq, false, removeReason); ok {
if ok := ms.removeMsg(sm.seq, false); ok {
purged++
if purged >= ss.Msgs {
break
@@ -1150,22 +1260,17 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64, _ /* noMarker
// Purge will remove all messages from this store.
// Will return the number of purged messages.
func (ms *memStore) Purge() (uint64, error) {
ms.mu.RLock()
first := ms.state.LastSeq + 1
ms.mu.RUnlock()
return ms.purge(first, false)
return ms.purge(0)
}
func (ms *memStore) purge(fseq uint64, _ /* noMarkers */ bool) (uint64, error) {
// TODO: Don't write markers on purge until we have solved performance
// issues with them.
noMarkers := true
func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.mu.Lock()
purged := uint64(len(ms.msgs))
cb := ms.scb
bytes := int64(ms.state.Bytes)
if fseq < ms.state.LastSeq {
if fseq == 0 {
fseq = ms.state.LastSeq + 1
} else if fseq < ms.state.LastSeq {
ms.mu.Unlock()
return 0, fmt.Errorf("partial purges not supported on memory store")
}
@@ -1175,23 +1280,14 @@ func (ms *memStore) purge(fseq uint64, _ /* noMarkers */ bool) (uint64, error) {
ms.state.Bytes = 0
ms.state.Msgs = 0
ms.msgs = make(map[uint64]*StoreMsg)
// Subject delete markers if needed.
if !noMarkers && ms.cfg.SubjectDeleteMarkerTTL > 0 {
ms.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
ms.markers = append(ms.markers, string(bsubj))
return true
})
}
ms.fss = stree.NewSubjectTree[SimpleState]()
sdmcb := ms.subjectDeleteMarkersAfterOperation(JSMarkerReasonPurge)
ms.dmap.Empty()
ms.sdm.empty()
ms.mu.Unlock()
if cb != nil {
cb(-int64(purged), -bytes, 0, _EMPTY_)
}
if sdmcb != nil {
sdmcb()
}
return purged, nil
}
@@ -1200,14 +1296,10 @@ func (ms *memStore) purge(fseq uint64, _ /* noMarkers */ bool) (uint64, error) {
// but not including the seq parameter.
// Will return the number of purged messages.
func (ms *memStore) Compact(seq uint64) (uint64, error) {
return ms.compact(seq, false)
return ms.compact(seq)
}
func (ms *memStore) compact(seq uint64, _ /* noMarkers */ bool) (uint64, error) {
// TODO: Don't write markers on compact until we have solved performance
// issues with them.
noMarkers := true
func (ms *memStore) compact(seq uint64) (uint64, error) {
if seq == 0 {
return ms.Purge()
}
@@ -1230,7 +1322,7 @@ func (ms *memStore) compact(seq uint64, _ /* noMarkers */ bool) (uint64, error)
if sm := ms.msgs[seq]; sm != nil {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
ms.removeSeqPerSubject(sm.subj, seq, !noMarkers && ms.cfg.SubjectDeleteMarkerTTL > 0)
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)
} else if !ms.dmap.IsEmpty() {
@@ -1255,35 +1347,23 @@ func (ms *memStore) compact(seq uint64, _ /* noMarkers */ bool) (uint64, error)
ms.state.FirstSeq = seq
ms.state.FirstTime = time.Time{}
ms.state.LastSeq = seq - 1
// Subject delete markers if needed.
if !noMarkers && ms.cfg.SubjectDeleteMarkerTTL > 0 {
ms.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
ms.markers = append(ms.markers, string(bsubj))
return true
})
}
// Reset msgs, fss and dmap.
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.dmap.Empty()
ms.sdm.empty()
}
// Subject delete markers if needed.
sdmcb := ms.subjectDeleteMarkersAfterOperation(JSMarkerReasonPurge)
ms.mu.Unlock()
if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
if sdmcb != nil {
sdmcb()
}
return purged, nil
}
// Will completely reset our store.
func (ms *memStore) reset() error {
ms.mu.Lock()
var purged, bytes uint64
cb := ms.scb
@@ -1306,6 +1386,7 @@ func (ms *memStore) reset() error {
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.dmap.Empty()
ms.sdm.empty()
ms.mu.Unlock()
@@ -1336,7 +1417,7 @@ func (ms *memStore) Truncate(seq uint64) error {
if sm := ms.msgs[i]; sm != nil {
purged++
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
ms.removeSeqPerSubject(sm.subj, i, false)
ms.removeSeqPerSubject(sm.subj, i)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, i)
} else if !ms.dmap.IsEmpty() {
@@ -1373,16 +1454,24 @@ func (ms *memStore) deleteFirstMsgOrPanic() {
}
func (ms *memStore) deleteFirstMsg() bool {
// TODO: Currently no markers for these types of limits (max msgs or max bytes)
return ms.removeMsg(ms.state.FirstSeq, false, _EMPTY_)
return ms.removeMsg(ms.state.FirstSeq, false)
}
// LoadMsg will lookup the message by sequence number and return it if found.
func (ms *memStore) LoadMsg(seq uint64, smp *StoreMsg) (*StoreMsg, error) {
ms.mu.RLock()
return ms.loadMsgLocked(seq, smp, true)
}
// loadMsgLocked will lookup the message by sequence number and return it if found.
func (ms *memStore) loadMsgLocked(seq uint64, smp *StoreMsg, needMSLock bool) (*StoreMsg, error) {
if needMSLock {
ms.mu.RLock()
}
sm, ok := ms.msgs[seq]
last := ms.state.LastSeq
ms.mu.RUnlock()
if needMSLock {
ms.mu.RUnlock()
}
if !ok || sm == nil {
var err = ErrStoreEOF
@@ -1570,8 +1659,7 @@ func (ms *memStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err
// Will return the number of bytes removed.
func (ms *memStore) RemoveMsg(seq uint64) (bool, error) {
ms.mu.Lock()
// TODO: Don't write markers on removes via the API yet, only via limits.
removed := ms.removeMsg(seq, false, _EMPTY_)
removed := ms.removeMsg(seq, false)
ms.mu.Unlock()
return removed, nil
}
@@ -1579,8 +1667,7 @@ func (ms *memStore) RemoveMsg(seq uint64) (bool, error) {
// EraseMsg will remove the message and rewrite its contents.
func (ms *memStore) EraseMsg(seq uint64) (bool, error) {
ms.mu.Lock()
// TODO: Don't write markers on removes via the API yet, only via limits.
removed := ms.removeMsg(seq, true, _EMPTY_)
removed := ms.removeMsg(seq, true)
ms.mu.Unlock()
return removed, nil
}
@@ -1620,17 +1707,15 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
// Remove a seq from the fss and select new first.
// Lock should be held.
func (ms *memStore) removeSeqPerSubject(subj string, seq uint64, marker bool) bool {
func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
ss, ok := ms.fss.Find(stringToBytes(subj))
if !ok {
return false
return
}
ms.sdm.removeSeqAndSubject(seq, subj)
if ss.Msgs == 1 {
ms.fss.Delete(stringToBytes(subj))
if marker {
ms.markers = append(ms.markers, subj)
}
return true
return
}
ss.Msgs--
@@ -1639,20 +1724,18 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64, marker bool) bo
if !ss.lastNeedsUpdate && seq != ss.Last {
ss.First = ss.Last
ss.firstNeedsUpdate = false
return false
return
}
if !ss.firstNeedsUpdate && seq != ss.First {
ss.Last = ss.First
ss.lastNeedsUpdate = false
return false
return
}
}
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
return false
}
// Will recalculate the first and/or last sequence for this subject.
@@ -1697,7 +1780,7 @@ func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) {
// Removes the message referenced by seq.
// Lock should be held.
func (ms *memStore) removeMsg(seq uint64, secure bool, marker string) bool {
func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
var ss uint64
sm, ok := ms.msgs[seq]
if !ok {
@@ -1729,29 +1812,18 @@ func (ms *memStore) removeMsg(seq uint64, secure bool, marker string) bool {
}
// Remove any per subject tracking.
needMarker := marker != _EMPTY_ && ms.cfg.SubjectDeleteMarkerTTL > 0 && len(getHeader(JSMarkerReason, sm.hdr)) == 0
wasLast := ms.removeSeqPerSubject(sm.subj, seq, needMarker)
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)
// If the deleted message was itself a delete marker then
// don't write out more of them or we'll churn endlessly.
var sdmcb func()
if needMarker && wasLast {
sdmcb = ms.subjectDeleteMarkersAfterOperation(marker)
}
if ms.scb != nil || sdmcb != nil {
if ms.scb != nil {
// We do not want to hold any locks here.
ms.mu.Unlock()
if ms.scb != nil {
delta := int64(ss)
ms.scb(-1, -delta, seq, sm.subj)
}
if sdmcb != nil {
sdmcb()
}
ms.mu.Lock()
}
@@ -1945,7 +2017,7 @@ func (ms *memStore) SyncDeleted(dbs DeleteBlocks) {
continue
}
db.Range(func(seq uint64) bool {
ms.removeMsg(seq, false, _EMPTY_)
ms.removeMsg(seq, false)
return true
})
}

View File

@@ -23,12 +23,14 @@ import (
"encoding/json"
"expvar"
"fmt"
"math"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"runtime/debug"
"runtime/pprof"
"slices"
"sort"
@@ -1215,6 +1217,7 @@ type Varz struct {
Mem int64 `json:"mem"`
Cores int `json:"cores"`
MaxProcs int `json:"gomaxprocs"`
MemLimit int64 `json:"gomemlimit,omitempty"`
CPU float64 `json:"cpu"`
Connections int `json:"connections"`
TotalConnections uint64 `json:"total_connections"`
@@ -1605,6 +1608,9 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz {
TrustedOperatorsJwt: opts.operatorJWT,
TrustedOperatorsClaim: opts.TrustedOperators,
}
if mm := debug.SetMemoryLimit(-1); mm < math.MaxInt64 {
varz.MemLimit = mm
}
// If this is a leaf without cluster, reset the cluster name (that is otherwise
// set to the server name).
if s.leafNoCluster {
@@ -2727,8 +2733,10 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
}
imports = append(imports, imp)
}
for _, v := range a.imports.services {
imports = append(imports, newExtImport(v))
for _, sis := range a.imports.services {
for _, v := range sis {
imports = append(imports, newExtImport(v))
}
}
responses := map[string]ExtImport{}
for k, v := range a.exports.responses {
@@ -2867,20 +2875,19 @@ type MetaClusterInfo struct {
// JSInfo has detailed information on JetStream.
type JSInfo struct {
ID string `json:"server_id"`
Now time.Time `json:"now"`
Disabled bool `json:"disabled,omitempty"`
Config JetStreamConfig `json:"config,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
JetStreamStats
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Messages uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
// aggregate raft info
ID string `json:"server_id"`
Now time.Time `json:"now"`
Disabled bool `json:"disabled,omitempty"`
Config JetStreamConfig `json:"config,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Messages uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
Total int `json:"total"`
}
func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail {
@@ -3015,6 +3022,9 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
if opts == nil {
opts = &JSzOptions{}
}
if opts.Offset < 0 {
opts.Offset = 0
}
if opts.Limit == 0 {
opts.Limit = 1024
}
@@ -3059,6 +3069,8 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
}
js.mu.RUnlock()
jsi.Total = len(accounts)
if mg := js.getMetaGroup(); mg != nil {
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()}
@@ -3073,17 +3085,22 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
jsi.JetStreamStats = *js.usageStats()
// If a specific account is requested, track the index.
filterIdx := -1
// Calculate the stats of all accounts and streams regardless of the filtering.
for i, jsa := range accounts {
if jsa.acc().GetName() == opts.Account {
filterIdx = i
}
jsa.mu.RLock()
streams := make([]*stream, 0, len(jsa.streams))
for _, stream := range jsa.streams {
streams = append(streams, stream)
}
jsa.mu.RUnlock()
jsi.Streams += len(streams)
for _, stream := range streams {
streamState := stream.state()
@@ -3093,34 +3110,35 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
}
}
// filter logic
if filterIdx != -1 {
accounts = []*jsAccount{accounts[filterIdx]}
// Targeted account takes precedence.
if filterIdx >= 0 {
accounts = accounts[filterIdx : filterIdx+1]
} else if opts.Accounts {
if opts.Offset != 0 {
if opts.Limit > 0 {
// Sort by name for a consistent read (barring any concurrent changes)
slices.SortFunc(accounts, func(i, j *jsAccount) int { return cmp.Compare(i.acc().Name, j.acc().Name) })
if opts.Offset > len(accounts) {
accounts = []*jsAccount{}
} else {
accounts = accounts[opts.Offset:]
}
}
if opts.Limit != 0 {
if opts.Limit < len(accounts) {
accounts = accounts[:opts.Limit]
}
// Offset larger than the number of accounts.
offset := min(opts.Offset, len(accounts))
accounts = accounts[offset:]
limit := min(opts.Limit, len(accounts))
accounts = accounts[:limit]
}
} else {
accounts = []*jsAccount{}
accounts = nil
}
if len(accounts) > 0 {
jsi.AccountDetails = make([]*AccountDetail, 0, len(accounts))
for _, jsa := range accounts {
detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly)
jsi.AccountDetails = append(jsi.AccountDetails, detail)
}
}
// if wanted, obtain accounts/streams/consumer
for _, jsa := range accounts {
detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly)
jsi.AccountDetails = append(jsi.AccountDetails, detail)
}
return jsi, nil
}

View File

@@ -281,15 +281,17 @@ type AuthCallout struct {
// NOTE: This structure is no longer used for monitoring endpoints
// and json tags are deprecated and may be removed in the future.
type Options struct {
ConfigFile string `json:"-"`
ServerName string `json:"server_name"`
Host string `json:"addr"`
Port int `json:"port"`
DontListen bool `json:"dont_listen"`
ClientAdvertise string `json:"-"`
Trace bool `json:"-"`
Debug bool `json:"-"`
TraceVerbose bool `json:"-"`
ConfigFile string `json:"-"`
ServerName string `json:"server_name"`
Host string `json:"addr"`
Port int `json:"port"`
DontListen bool `json:"dont_listen"`
ClientAdvertise string `json:"-"`
Trace bool `json:"-"`
Debug bool `json:"-"`
TraceVerbose bool `json:"-"`
// TraceHeaders if true will only trace message headers, not the payload
TraceHeaders bool `json:"-"`
NoLog bool `json:"-"`
NoSigs bool `json:"-"`
NoSublistCache bool `json:"-"`
@@ -304,6 +306,7 @@ type Options struct {
Users []*User `json:"-"`
Accounts []*Account `json:"-"`
NoAuthUser string `json:"-"`
DefaultSentinel string `json:"-"`
SystemAccount string `json:"-"`
NoSystemAccount bool `json:"-"`
Username string `json:"-"`
@@ -1002,6 +1005,11 @@ func (o *Options) processConfigFileLine(k string, v any, errors *[]error, warnin
o.Trace = v.(bool)
trackExplicitVal(&o.inConfig, "TraceVerbose", o.TraceVerbose)
trackExplicitVal(&o.inConfig, "Trace", o.Trace)
case "trace_headers":
o.TraceHeaders = v.(bool)
o.Trace = v.(bool)
trackExplicitVal(&o.inConfig, "TraceHeaders", o.TraceHeaders)
trackExplicitVal(&o.inConfig, "Trace", o.Trace)
case "logtime":
o.Logtime = v.(bool)
trackExplicitVal(&o.inConfig, "Logtime", o.Logtime)
@@ -1024,8 +1032,10 @@ func (o *Options) processConfigFileLine(k string, v any, errors *[]error, warnin
*errors = append(*errors, err)
return
}
case "default_sentinel":
o.DefaultSentinel = v.(string)
case "authorization":
auth, err := parseAuthorization(tk, errors)
auth, err := parseAuthorization(tk, errors, warnings)
if err != nil {
*errors = append(*errors, err)
return
@@ -1802,7 +1812,7 @@ func parseCluster(v any, opts *Options, errors *[]error, warnings *[]error) erro
case "host", "net":
opts.Cluster.Host = mv.(string)
case "authorization":
auth, err := parseAuthorization(tk, errors)
auth, err := parseAuthorization(tk, errors, warnings)
if err != nil {
*errors = append(*errors, err)
continue
@@ -2038,7 +2048,7 @@ func parseGateway(v any, o *Options, errors *[]error, warnings *[]error) error {
case "host", "net":
o.Gateway.Host = mv.(string)
case "authorization":
auth, err := parseAuthorization(tk, errors)
auth, err := parseAuthorization(tk, errors, warnings)
if err != nil {
*errors = append(*errors, err)
continue
@@ -2186,9 +2196,9 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error {
}
switch vv {
case "system", _EMPTY_:
acc.js.nrgAccount = _EMPTY_
acc.nrgAccount = _EMPTY_
case "owner":
acc.js.nrgAccount = acc.Name
acc.nrgAccount = acc.Name
default:
return &configErr{tk, fmt.Sprintf("Expected 'system' or 'owner' string value for %q, got %v", mk, mv)}
}
@@ -2500,7 +2510,7 @@ func parseLeafNodes(v any, opts *Options, errors *[]error, warnings *[]error) er
case "host", "net":
opts.LeafNode.Host = mv.(string)
case "authorization":
auth, err := parseLeafAuthorization(tk, errors)
auth, err := parseLeafAuthorization(tk, errors, warnings)
if err != nil {
*errors = append(*errors, err)
continue
@@ -2579,7 +2589,7 @@ func parseLeafNodes(v any, opts *Options, errors *[]error, warnings *[]error) er
// This is the authorization parser adapter for the leafnode's
// authorization config.
func parseLeafAuthorization(v any, errors *[]error) (*authorization, error) {
func parseLeafAuthorization(v any, errors, warnings *[]error) (*authorization, error) {
var (
am map[string]any
tk token
@@ -2604,12 +2614,24 @@ func parseLeafAuthorization(v any, errors *[]error) (*authorization, error) {
}
auth.nkey = nk
case "timeout":
at := float64(1)
at := float64(0)
switch mv := mv.(type) {
case int64:
at = float64(mv)
case float64:
at = mv
case string:
d, err := time.ParseDuration(mv)
if err != nil {
return nil, &configErr{tk, fmt.Sprintf("error parsing leafnode authorization config, 'timeout' %s", err)}
}
at = d.Seconds()
default:
return nil, &configErr{tk, "error parsing leafnode authorization config, 'timeout' wrong type"}
}
if at > (60 * time.Second).Seconds() {
reason := fmt.Sprintf("timeout of %v (%f seconds) is high, consider keeping it under 60 seconds. possibly caused by unquoted duration; use '1m' instead of 1m, for example", mv, at)
*warnings = append(*warnings, &configWarningErr{field: mk, configErr: configErr{token: tk, reason: reason}})
}
auth.timeout = at
case "users":
@@ -3586,8 +3608,9 @@ func parseAccountImports(v any, acc *Account, errors *[]error) ([]*importStream,
var services []*importService
var streams []*importStream
svcSubjects := map[string]*importService{}
svcSubjects := map[string][]*importService{}
IMS_LOOP:
for _, v := range ims {
// Should have stream or service
stream, service, err := parseImportStreamOrService(v, errors)
@@ -3596,16 +3619,20 @@ func parseAccountImports(v any, acc *Account, errors *[]error) ([]*importStream,
continue
}
if service != nil {
if dup := svcSubjects[service.to]; dup != nil {
tk, _ := unwrapValue(v, &lt)
err := &configErr{tk,
fmt.Sprintf("Duplicate service import subject %q, previously used in import for account %q, subject %q",
service.to, dup.an, dup.sub)}
*errors = append(*errors, err)
continue
sisPerSubj := svcSubjects[service.to]
for _, dup := range sisPerSubj {
if dup.an == service.an {
tk, _ := unwrapValue(v, &lt)
err := &configErr{tk,
fmt.Sprintf("Duplicate service import subject %q, previously used in import for account %q, subject %q",
service.to, dup.an, dup.sub)}
*errors = append(*errors, err)
continue IMS_LOOP
}
}
svcSubjects[service.to] = service
service.acc = acc
sisPerSubj = append(sisPerSubj, service)
svcSubjects[service.to] = sisPerSubj
services = append(services, service)
}
if stream != nil {
@@ -4098,7 +4125,7 @@ func applyDefaultPermissions(users []*User, nkeys []*NkeyUser, defaultP *Permiss
}
// Helper function to parse Authorization configs.
func parseAuthorization(v any, errors *[]error) (*authorization, error) {
func parseAuthorization(v any, errors, warnings *[]error) (*authorization, error) {
var (
am map[string]any
tk token
@@ -4119,12 +4146,24 @@ func parseAuthorization(v any, errors *[]error) (*authorization, error) {
case "token":
auth.token = mv.(string)
case "timeout":
at := float64(1)
at := float64(0)
switch mv := mv.(type) {
case int64:
at = float64(mv)
case float64:
at = mv
case string:
d, err := time.ParseDuration(mv)
if err != nil {
return nil, &configErr{tk, fmt.Sprintf("error parsing authorization config, 'timeout' %s", err)}
}
at = d.Seconds()
default:
return nil, &configErr{tk, "error parsing authorization config, 'timeout' wrong type"}
}
if at > (60 * time.Second).Seconds() {
reason := fmt.Sprintf("timeout of %v (%f seconds) is high, consider keeping it under 60 seconds. possibly caused by unquoted duration; use '1m' instead of 1m, for example", mv, at)
*warnings = append(*warnings, &configWarningErr{field: mk, configErr: configErr{token: tk, reason: reason}})
}
auth.timeout = at
case "users":

View File

@@ -940,7 +940,7 @@ func (c *client) parse(buf []byte) error {
return err
}
if trace {
c.traceInOp("CONNECT", removePassFromTrace(arg))
c.traceInOp("CONNECT", removeSecretsFromTrace(arg))
}
if err := c.processConnect(arg); err != nil {
return err

View File

@@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !amd64
//go:build cgo && freebsd
package pse

View File

@@ -1,4 +1,4 @@
// Copyright 2015-2020 The NATS Authors
// Copyright 2015-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -11,22 +11,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// This is the amd64-specific FreeBSD implementation, with hard-coded offset
// constants derived by running freebsd.txt; having this implementation allows
// us to compile without CGO, which lets us cross-compile for FreeBSD from our
// CI system and so supply binaries for FreeBSD amd64.
// There are two FreeBSD implementations; one which uses cgo and should build
// locally on any FreeBSD, and this one which uses sysctl but needs us to know
// the offset constants for the fields we care about.
//
// The advantage of this one is that without cgo, it is much easier to
// cross-compile to a target. The official releases are all built with
// cross-compilation.
//
// We've switched the other implementation to include '_cgo' in the filename,
// to show that it's not the default. This isn't an os or arch build tag,
// so we have to use explicit build-tags within.
// If lacking CGO support and targeting an unsupported arch, then before the
// change you would have a compile failure for not being able to cross-compile.
// After the change, you have a compile failure for not having the symbols
// because no source file satisfies them.
// Thus we are no worse off, and it's now much easier to extend support for
// non-CGO to new architectures, just by editing this file.
//
// To generate for other architectures:
// 1. Update pse_freebsd.go, change the build exclusion to exclude your arch
// 2. Copy this file to be built for your arch
// 3. Update `nativeEndian` below
// 4. Link `freebsd.txt` to have a .c filename and compile and run, then
// paste the outputs into the const section below.
// 1. Copy `freebsd.txt` to have a .c filename on a box running the target
// architecture, compile and run it.
// 2. Update the init() function below to include a case for this architecture
// 3. Update the build-tags in this file.
//go:build !cgo && freebsd && (amd64 || arm64)
package pse
import (
"encoding/binary"
"runtime"
"syscall"
"golang.org/x/sys/unix"
@@ -37,15 +52,34 @@ import (
// than little or big endian.
var nativeEndian = binary.LittleEndian
const (
KIP_OFF_size = 256
KIP_OFF_rssize = 264
KIP_OFF_pctcpu = 308
var pageshift int // derived from getpagesize(3) in init() below
var (
// These are populated in the init function, based on the current architecture.
// (It's less file-count explosion than having one small file for each
// FreeBSD architecture).
KIP_OFF_size int
KIP_OFF_rssize int
KIP_OFF_pctcpu int
)
var pageshift int
func init() {
switch runtime.GOARCH {
// These are the values which come from compiling and running
// freebsd.txt as a C program.
// Most recently validated: 2025-04 with FreeBSD 14.2R in AWS.
case "amd64":
KIP_OFF_size = 256
KIP_OFF_rssize = 264
KIP_OFF_pctcpu = 308
case "arm64":
KIP_OFF_size = 256
KIP_OFF_rssize = 264
KIP_OFF_pctcpu = 308
default:
panic("code bug: server/pse FreeBSD support missing case for '" + runtime.GOARCH + "' but build-tags allowed us to build anyway?")
}
// To get the physical page size, the C library checks two places:
// process ELF auxiliary info, AT_PAGESZ
// as a fallback, the hw.pagesize sysctl

View File

@@ -59,6 +59,7 @@ type RaftNode interface {
SetObserver(isObserver bool)
IsObserver() bool
Campaign() error
CampaignImmediately() error
ID() string
Group() string
Peers() []*Peer
@@ -89,7 +90,7 @@ type WAL interface {
RemoveMsg(index uint64) (bool, error)
Compact(index uint64) (uint64, error)
Purge() (uint64, error)
PurgeEx(subject string, seq, keep uint64, noMarkers bool) (uint64, error)
PurgeEx(subject string, seq, keep uint64) (uint64, error)
Truncate(seq uint64) error
State() StreamState
FastState(*StreamState)
@@ -153,7 +154,7 @@ type raft struct {
qn int // Number of nodes needed to establish quorum
peers map[string]*lps // Other peers in the Raft group
removed map[string]struct{} // Peers that were removed from the group
removed map[string]time.Time // Peers that were removed from the group
acks map[uint64]map[string]struct{} // Append entry responses/acks, map of entry index -> peer ID
pae map[uint64]*appendEntry // Pending append entries
@@ -252,6 +253,7 @@ const (
lostQuorumIntervalDefault = hbIntervalDefault * 10 // 10 seconds
lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds
observerModeIntervalDefault = 48 * time.Hour
peerRemoveTimeoutDefault = 5 * time.Minute
)
var (
@@ -263,6 +265,7 @@ var (
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
observerModeInterval = observerModeIntervalDefault
peerRemoveTimeout = peerRemoveTimeoutDefault
)
type RaftConfig struct {
@@ -600,7 +603,7 @@ func (n *raft) recreateInternalSubsLocked() error {
if a, _ := n.s.lookupAccount(n.accName); a != nil {
a.mu.RLock()
if a.js != nil {
target = a.js.nrgAccount
target = a.nrgAccount
}
a.mu.RUnlock()
}
@@ -891,9 +894,9 @@ func (n *raft) ProposeAddPeer(peer string) error {
func (n *raft) doRemovePeerAsLeader(peer string) {
n.Lock()
if n.removed == nil {
n.removed = map[string]struct{}{}
n.removed = map[string]time.Time{}
}
n.removed[peer] = struct{}{}
n.removed[peer] = time.Now()
if _, ok := n.peers[peer]; ok {
delete(n.peers, peer)
// We should decrease our cluster size since we are tracking this peer and the peer is most likely already gone.
@@ -1172,9 +1175,12 @@ func (n *raft) InstallSnapshot(data []byte) error {
return errNoSnapAvailable
}
term := n.pterm
var term uint64
if ae, _ := n.loadEntry(n.applied); ae != nil {
term = ae.term
} else {
n.debug("Not snapshotting as entry %d is not available", n.applied)
return errNoSnapAvailable
}
n.debug("Installing snapshot of %d bytes", len(data))
@@ -1639,7 +1645,14 @@ func (n *raft) StepDown(preferred ...string) error {
func (n *raft) Campaign() error {
n.Lock()
defer n.Unlock()
return n.campaign()
return n.campaign(randCampaignTimeout())
}
// CampaignImmediately will have our node start a leadership vote after minimal delay.
func (n *raft) CampaignImmediately() error {
n.Lock()
defer n.Unlock()
return n.campaign(minCampaignTimeout / 2)
}
func randCampaignTimeout() time.Duration {
@@ -1649,12 +1662,12 @@ func randCampaignTimeout() time.Duration {
// Campaign will have our node start a leadership vote.
// Lock should be held.
func (n *raft) campaign() error {
func (n *raft) campaign(et time.Duration) error {
n.debug("Starting campaign")
if n.State() == Leader {
return errAlreadyLeader
}
n.resetElect(randCampaignTimeout())
n.resetElect(et)
return nil
}
@@ -2957,9 +2970,9 @@ func (n *raft) applyCommit(index uint64) error {
// Make sure we have our removed map.
if n.removed == nil {
n.removed = make(map[string]struct{})
n.removed = make(map[string]time.Time)
}
n.removed[peer] = struct{}{}
n.removed[peer] = time.Now()
if _, ok := n.peers[peer]; ok {
delete(n.peers, peer)
@@ -3074,8 +3087,13 @@ func (n *raft) adjustClusterSizeAndQuorum() {
func (n *raft) trackPeer(peer string) error {
n.Lock()
var needPeerAdd, isRemoved bool
var rts time.Time
if n.removed != nil {
_, isRemoved = n.removed[peer]
rts, isRemoved = n.removed[peer]
// Removed peers can rejoin after timeout.
if isRemoved && time.Since(rts) >= peerRemoveTimeout {
isRemoved = false
}
}
if n.State() == Leader {
if lp, ok := n.peers[peer]; !ok || !lp.kp {
@@ -3261,12 +3279,13 @@ func (n *raft) truncateWAL(term, index uint64) {
if err := n.wal.Truncate(index); err != nil {
// If we get an invalid sequence, reset our wal all together.
// We will not have holes, so this means we do not have this message stored anymore.
// This is normal when truncating back to applied/snapshot.
if err == ErrInvalidSequence {
n.debug("Resetting WAL")
n.debug("Clearing WAL")
n.wal.Truncate(0)
// If our index is non-zero use PurgeEx to set us to the correct next index.
if index > 0 {
n.wal.PurgeEx(fwcs, index+1, 0, true)
n.wal.PurgeEx(fwcs, index+1, 0)
}
} else {
n.warn("Error truncating WAL: %v", err)
@@ -3454,6 +3473,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
} else if ae.pindex == n.applied {
// Entry can't be found, this is normal because we have a snapshot at this index.
// Truncate back to where we've created the snapshot.
n.truncateWAL(ae.pterm, ae.pindex)
} else {
n.resetWAL()
}

View File

@@ -139,7 +139,7 @@ func (t *traceOption) Apply(server *Server) {
server.Noticef("Reloaded: trace = %v", t.newValue)
}
// traceOption implements the option interface for the `trace` setting.
// traceVersboseOption implements the option interface for the `trace_verbose` setting.
type traceVerboseOption struct {
traceLevelOption
newValue bool
@@ -150,6 +150,17 @@ func (t *traceVerboseOption) Apply(server *Server) {
server.Noticef("Reloaded: trace_verbose = %v", t.newValue)
}
// traceHeadersOption implements the option interface for the `trace_headers` setting.
type traceHeadersOption struct {
traceLevelOption
newValue bool
}
// Apply is a no-op because logging will be reloaded after options are applied.
func (t *traceHeadersOption) Apply(server *Server) {
server.Noticef("Reloaded: trace_headers = %v", t.newValue)
}
// debugOption implements the option interface for the `debug` setting.
type debugOption struct {
loggingOption
@@ -717,6 +728,15 @@ func (jso jetStreamOption) IsStatszChange() bool {
return true
}
type defaultSentinelOption struct {
noopOption
newValue string
}
func (so *defaultSentinelOption) Apply(s *Server) {
s.Noticef("Reloaded: default_sentinel = %s", so.newValue)
}
type ocspOption struct {
tlsOption
newValue *OCSPConfig
@@ -1231,6 +1251,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
switch optName {
case "traceverbose":
diffOpts = append(diffOpts, &traceVerboseOption{newValue: newValue.(bool)})
case "traceheaders":
diffOpts = append(diffOpts, &traceHeadersOption{newValue: newValue.(bool)})
case "trace":
diffOpts = append(diffOpts, &traceOption{newValue: newValue.(bool)})
case "debug":
@@ -1625,6 +1647,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
field.Name, oldValue, newValue)
}
case "defaultsentinel":
diffOpts = append(diffOpts, &defaultSentinelOption{newValue: newValue.(string)})
case "systemaccount":
if oldValue != DEFAULT_SYSTEM_ACCOUNT || newValue != _EMPTY_ {
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
@@ -2279,7 +2303,7 @@ func (s *Server) reloadClusterPoolAndAccounts(co *clusterOption, opts *Options)
}
// Otherwise get the route pool index it would have been before
// the move so we can send the protocol to those routes.
rpi = s.computeRoutePoolIdx(acc)
rpi = computeRoutePoolIdx(s.routesPoolSize, acc.Name)
}
acc.mu.Unlock()
// Generate the INFO protocol to send indicating that this account
@@ -2290,21 +2314,56 @@ func (s *Server) reloadClusterPoolAndAccounts(co *clusterOption, opts *Options)
RouteAccReqID: s.accAddedReqID,
}
proto := generateInfoJSON(&ri)
// Go over each remote's route at pool index `rpi` and remove
// remote subs for this account and send the protocol.
s.forEachRouteIdx(rpi, func(r *client) bool {
// Since v2.11.0, we support remotes with a different pool size
// (for rolling upgrades), so we need to use the remote route
// pool index (based on the remote configured pool size) since
// the remote subscriptions will be attached to the route at
// that index, not at our account's route pool index. However,
// we are going to send the protocol through the route that
// handles this account from our pool size perspective (that
// would be the route at index `rpi`).
removeSubsAndSendProto := func(r *client, doSubs, doProto bool) {
r.mu.Lock()
defer r.mu.Unlock()
// Exclude routes to servers that don't support pooling.
if !r.route.noPool {
if r.route.noPool {
return
}
if doSubs {
if subs := r.removeRemoteSubsForAcc(an); len(subs) > 0 {
sl.RemoveBatch(subs)
}
}
if doProto {
r.enqueueProto(proto)
protosSent++
}
r.mu.Unlock()
return true
})
}
for remote, conns := range s.routes {
r := conns[rpi]
// The route connection at this index is currently not up,
// so we won't be able to send the protocol, so move to the
// next remote.
if r == nil {
continue
}
doSubs := true
// Check the remote's route pool size and if different than
// ours, remove the subs on that other route.
remotePoolSize, ok := s.remoteRoutePoolSize[remote]
if ok && remotePoolSize != s.routesPoolSize {
// This is the remote's route pool index for this account
rrpi := computeRoutePoolIdx(remotePoolSize, an)
if rr := conns[rrpi]; rr != nil {
removeSubsAndSendProto(rr, true, false)
// Indicate that we have already remove the subs.
doSubs = false
}
}
// Now send the protocol from the route that handles the
// account from this server perspective.
removeSubsAndSendProto(r, doSubs, true)
}
}
}
if protosSent > 0 {

View File

@@ -18,6 +18,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"hash/fnv"
"math/rand"
"net"
"net/url"
@@ -509,6 +510,21 @@ func (c *client) sendRouteConnect(clusterName string, tlsRequired bool) error {
return nil
}
// Returns a route pool index for this account based on the given pool size.
// If `poolSize` is smaller or equal to 1, the returned value will always
// be 0, regardless of the account name. If not, the returned value will
// be in the range [0..poolSize-1]. The value for a given account name
// is constant and same on all servers (given the same `poolSize` value).
func computeRoutePoolIdx(poolSize int, an string) int {
if poolSize <= 1 {
return 0
}
h := fnv.New32a()
h.Write([]byte(an))
sum32 := h.Sum32()
return int((sum32 % uint32(poolSize)))
}
// Process the info message if we are a route.
func (c *client) processRouteInfo(info *Info) {
@@ -592,13 +608,18 @@ func (c *client) processRouteInfo(info *Info) {
// to suppress possible remote subscription interest coming
// in while the transition is happening.
acc.routePoolIdx = accTransitioningToDedicatedRoute
} else if info.RoutePoolSize == s.routesPoolSize {
// Otherwise, and if the other side's pool size matches
// ours, get the route pool index that was handling this
// account.
rpi = s.computeRoutePoolIdx(acc)
}
acc.mu.Unlock()
// Since v2.11.0, we support remotes with a different pool size
// (for rolling upgrades), so we need to use the remote route
// pool index (based on the remote configured pool size) since
// the remote subscriptions will be attached to the route at
// that index, not at our account's route pool index. But we
// need to compute only if rpi is negative or the pool sizes
// are different.
if rpi <= 0 || info.RoutePoolSize != s.routesPoolSize {
rpi = computeRoutePoolIdx(info.RoutePoolSize, an)
}
// Go over each remote's route at pool index `rpi` and remove
// remote subs for this account.
s.forEachRouteIdx(rpi, func(r *client) bool {
@@ -2100,6 +2121,11 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod
// the first connection is established.
var noReconnectForOldServer bool
// To allow rolling updates, we now allow servers with different pool sizes
// so we will use as the effective pool size here, the max between our
// configured size and the size we receive in the info protocol.
effectivePoolSize := max(s.routesPoolSize, info.RoutePoolSize)
// If the remote is an old server, info.RoutePoolSize will be 0, or if
// this server's Cluster.PoolSize is negative, we will behave as an old
// server and need to handle things differently.
@@ -2119,15 +2145,12 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod
// sending subscriptions over routes.
s.routesNoPool++
}
} else if s.routesPoolSize != info.RoutePoolSize {
// The cluster's PoolSize configuration must be an exact match with the remote server.
invProtoErr = fmt.Sprintf("Mismatch route pool size: %v vs %v", s.routesPoolSize, info.RoutePoolSize)
} else if didSolicit {
// For solicited route, the incoming's RoutePoolIdx should not be set.
if info.RoutePoolIdx != 0 {
invProtoErr = fmt.Sprintf("Route pool index should not be set but is set to %v", info.RoutePoolIdx)
}
} else if info.RoutePoolIdx < 0 || info.RoutePoolIdx >= s.routesPoolSize {
} else if info.RoutePoolIdx < 0 || info.RoutePoolIdx >= effectivePoolSize {
// For non solicited routes, if the remote sends a RoutePoolIdx, make
// sure it is a valid one (in range of the pool size).
invProtoErr = fmt.Sprintf("Invalid route pool index: %v - pool size is %v", info.RoutePoolIdx, info.RoutePoolSize)
@@ -2196,9 +2219,9 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod
// Check if we know about the remote server
conns, exists := s.routes[id]
if !exists {
// No, create a slice for route connections of the size of the pool
// Now, create a slice for route connections of the size of the pool
// or 1 when not in pool mode.
conns = make([]*client, s.routesPoolSize)
conns = make([]*client, effectivePoolSize)
// Track this slice for this remote server.
s.routes[id] = conns
// Set the index to info.RoutePoolIdx because if this is a solicited
@@ -2206,6 +2229,19 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod
// will use whatever index the remote has chosen.
idx = info.RoutePoolIdx
} else if pool {
// The remote could have done a config reload and increased the pool size.
// It will close the connections before soliciting again, however, if
// on this side, one of the route is not yet fully removed, but the
// first one is, it would accept the new connection (with a greater pool
// size) and we would not go through the phase of `!exists` above creating
// the slice with the right size. So we need to check here and add new empty
// entries to complete the effective pool size.
if n := effectivePoolSize - len(conns); n > 0 {
for range n {
conns = append(conns, nil)
}
s.routes[id] = conns
}
// The remote was found. If this is a non solicited route, we will place
// the connection in the pool at the index given by info.RoutePoolIdx.
// But if there is already one, close this incoming connection as a
@@ -2267,6 +2303,19 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod
}
c.mu.Unlock()
// With pooling, we keep track of the remote's configured route pool size.
// We do so when adding the connection in the first slot, not when `sz == 1`
// because there could be situations where we have old connections that have
// not yet been removed and so we would not have `sz == `. However, we will
// always have the condition where we are adding the new connection at `idx==0`
// so use that as the condition to store the remote pool size.
if pool && idx == 0 {
if s.remoteRoutePoolSize == nil {
s.remoteRoutePoolSize = make(map[string]int)
}
s.remoteRoutePoolSize[id] = info.RoutePoolSize
}
// Add to the slice and bump the count of connections for this remote
conns[idx] = c
sz++
@@ -2323,7 +2372,7 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod
s.sendSubsToRoute(c, idx, _EMPTY_)
// In pool mode, if we did not yet reach the cap, try to connect a new connection
if pool && didSolicit && sz != s.routesPoolSize {
if pool && didSolicit && sz != effectivePoolSize {
s.startGoRoutine(func() {
select {
case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
@@ -3117,6 +3166,8 @@ func (s *Server) removeRoute(c *client) {
if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) {
s.sendAsyncLeafNodeInfo()
}
// We can remove the configured route pool size of this remote.
delete(s.remoteRoutePoolSize, rID)
// If this server has pooling/pinned accounts and the route for
// this remote was a "no pool" route, attempt to reconnect.
if noPool {

77
vendor/github.com/nats-io/nats-server/v2/server/sdm.go generated vendored Normal file
View File

@@ -0,0 +1,77 @@
// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import "time"
// SDMMeta holds pending/proposed data for subject delete markers or message removals.
type SDMMeta struct {
totals map[string]uint64
pending map[uint64]SDMBySeq
}
// SDMBySeq holds data for a message with a specific sequence.
type SDMBySeq struct {
last bool // Whether the message for this sequence was the last for this subject.
ts int64 // Last timestamp we proposed a removal/sdm.
}
// SDMBySubj holds whether a message for a specific subject and sequence was a subject delete marker or not.
type SDMBySubj struct {
seq uint64
sdm bool
}
func newSDMMeta() *SDMMeta {
return &SDMMeta{
totals: make(map[string]uint64, 1),
pending: make(map[uint64]SDMBySeq, 1),
}
}
// empty clears all data.
func (sdm *SDMMeta) empty() {
if sdm == nil {
return
}
clear(sdm.totals)
clear(sdm.pending)
}
// trackPending caches the given seq and subj and whether it's the last message for that subject.
func (sdm *SDMMeta) trackPending(seq uint64, subj string, last bool) bool {
if p, ok := sdm.pending[seq]; ok {
return p.last
}
sdm.pending[seq] = SDMBySeq{last, time.Now().UnixNano()}
sdm.totals[subj]++
return last
}
// removeSeqAndSubject clears the seq and subj from the cache.
func (sdm *SDMMeta) removeSeqAndSubject(seq uint64, subj string) {
if sdm == nil {
return
}
if _, ok := sdm.pending[seq]; ok {
delete(sdm.pending, seq)
if msgs, ok := sdm.totals[subj]; ok {
if msgs <= 1 {
delete(sdm.totals, subj)
} else {
sdm.totals[subj] = msgs - 1
}
}
}
}

View File

@@ -21,7 +21,6 @@ import (
"errors"
"flag"
"fmt"
"hash/fnv"
"io"
"log"
"math/rand"
@@ -193,6 +192,7 @@ type Server struct {
accResolver AccountResolver
clients map[uint64]*client
routes map[string][]*client
remoteRoutePoolSize map[string]int // Map for remote's configure route pool size
routesPoolSize int // Configured pool size
routesReject bool // During reload, we may want to reject adding routes until some conditions are met
routesNoPool int // Number of routes that don't use pooling (connecting to older server for instance)
@@ -1180,9 +1180,11 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
// Collect the sids for the service imports since we are going to
// replace with new ones.
var sids [][]byte
for _, si := range a.imports.services {
if si.sid != nil {
sids = append(sids, si.sid)
for _, sis := range a.imports.services {
for _, si := range sis {
if si.sid != nil {
sids = append(sids, si.sid)
}
}
}
// Setup to process later if needed.
@@ -1297,19 +1299,21 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
si.acc = v.(*Account)
}
}
for _, si := range acc.imports.services {
if v, ok := s.accounts.Load(si.acc.Name); ok {
si.acc = v.(*Account)
for _, sis := range acc.imports.services {
for _, si := range sis {
if v, ok := s.accounts.Load(si.acc.Name); ok {
si.acc = v.(*Account)
// It is possible to allow for latency tracking inside your
// own account, so lock only when not the same account.
if si.acc == acc {
// It is possible to allow for latency tracking inside your
// own account, so lock only when not the same account.
if si.acc == acc {
si.se = si.acc.getServiceExport(si.to)
continue
}
si.acc.mu.RLock()
si.se = si.acc.getServiceExport(si.to)
continue
si.acc.mu.RUnlock()
}
si.acc.mu.RLock()
si.se = si.acc.getServiceExport(si.to)
si.acc.mu.RUnlock()
}
}
// Make sure the subs are running, but only if not reloading.
@@ -1756,7 +1760,7 @@ func (s *Server) setSystemAccount(acc *Account) error {
// locks on fast path for inbound messages and checking service imports.
acc.mu.Lock()
if acc.imports.services == nil {
acc.imports.services = make(map[string]*serviceImport)
acc.imports.services = make(map[string][]*serviceImport)
}
acc.mu.Unlock()
@@ -1833,9 +1837,9 @@ func (s *Server) createInternalAccountClient() *client {
return s.createInternalClient(ACCOUNT)
}
// Internal clients. kind should be SYSTEM or JETSTREAM
// Internal clients. kind should be SYSTEM, JETSTREAM or ACCOUNT
func (s *Server) createInternalClient(kind int) *client {
if kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT {
if !isInternalClient(kind) {
return nil
}
now := time.Now()
@@ -1968,27 +1972,13 @@ func (s *Server) setRouteInfo(acc *Account) {
// use modulo to assign to an index of the pool slice. For 1
// and below, all accounts will be bound to the single connection
// at index 0.
acc.routePoolIdx = s.computeRoutePoolIdx(acc)
acc.routePoolIdx = computeRoutePoolIdx(s.routesPoolSize, acc.Name)
if s.routesPoolSize > 1 {
s.accRouteByHash.Store(acc.Name, acc.routePoolIdx)
}
}
}
// Returns a route pool index for this account based on the given pool size.
// Account lock is held on entry (account's name is accessed but immutable
// so could be called without account's lock).
// Server lock held on entry.
func (s *Server) computeRoutePoolIdx(acc *Account) int {
if s.routesPoolSize <= 1 {
return 0
}
h := fnv.New32a()
h.Write([]byte(acc.Name))
sum32 := h.Sum32()
return int((sum32 % uint32(s.routesPoolSize)))
}
// lookupAccount is a function to return the account structure
// associated with an account name.
// Lock MUST NOT be held upon entry.

View File

@@ -46,7 +46,7 @@ func (s *Server) handleSignals() {
for {
select {
case sig := <-c:
s.Debugf("Trapped %q signal", sig)
s.Noticef("Trapped %q signal", sig)
switch sig {
case syscall.SIGINT:
s.Shutdown()

View File

@@ -84,6 +84,9 @@ type StoreMsg struct {
// For the cases where its a single message we will also supply sequence number and subject.
type StorageUpdateHandler func(msgs, bytes int64, seq uint64, subj string)
// Used to call back into the upper layers to remove a message.
type StorageRemoveMsgHandler func(seq uint64)
// Used to call back into the upper layers to report on newly created subject delete markers.
type SubjectDeleteMarkerUpdateHandler func(*inMsg)
@@ -100,13 +103,14 @@ type StreamStore interface {
RemoveMsg(seq uint64) (bool, error)
EraseMsg(seq uint64) (bool, error)
Purge() (uint64, error)
PurgeEx(subject string, seq, keep uint64, noMarkers bool) (uint64, error)
PurgeEx(subject string, seq, keep uint64) (uint64, error)
Compact(seq uint64) (uint64, error)
Truncate(seq uint64) error
GetSeqFromTime(t time.Time) uint64
FilteredState(seq uint64, subject string) SimpleState
SubjectsState(filterSubject string) map[string]SimpleState
SubjectsTotals(filterSubject string) map[string]uint64
AllLastSeqs() ([]uint64, error)
MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error)
NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64)
NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64)
@@ -116,6 +120,7 @@ type StreamStore interface {
SyncDeleted(dbs DeleteBlocks)
Type() StorageType
RegisterStorageUpdates(StorageUpdateHandler)
RegisterStorageRemoveMsg(handler StorageRemoveMsgHandler)
RegisterSubjectDeleteMarkerUpdates(SubjectDeleteMarkerUpdateHandler)
UpdateConfig(cfg *StreamConfig) error
Delete() error

View File

@@ -489,10 +489,11 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}
// Sensible defaults.
cfg, apiErr := s.checkStreamCfg(config, a, pedantic)
ccfg, apiErr := s.checkStreamCfg(config, a, pedantic)
if apiErr != nil {
return nil, apiErr
}
cfg := &ccfg
singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode()
if singleServerMode && cfg.Replicas > 1 {
@@ -533,7 +534,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
s.setIndexName()
}
if reflect.DeepEqual(ocfg, cfg) {
copyStreamMetadata(cfg, &ocfg)
if reflect.DeepEqual(cfg, &ocfg) {
if sa != nil {
mset.setStreamAssignment(sa)
}
@@ -547,7 +549,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.usageMu.RUnlock()
reserved := int64(0)
if !isClustered {
reserved = jsa.tieredReservation(tier, &cfg)
reserved = jsa.tieredReservation(tier, cfg)
}
jsa.mu.Unlock()
@@ -556,9 +558,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}
js.mu.RLock()
if isClustered {
_, reserved = tieredStreamAndReservationCount(js.cluster.streams[a.Name], tier, &cfg)
_, reserved = tieredStreamAndReservationCount(js.cluster.streams[a.Name], tier, cfg)
}
if err := js.checkAllLimits(&selected, &cfg, reserved, 0); err != nil {
if err := js.checkAllLimits(&selected, cfg, reserved, 0); err != nil {
js.mu.RUnlock()
return nil, err
}
@@ -651,7 +653,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
mset := &stream{
acc: a,
jsa: jsa,
cfg: cfg,
cfg: *cfg,
js: js,
srv: s,
client: c,
@@ -1385,14 +1387,23 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo
}
if cfg.SubjectDeleteMarkerTTL > 0 {
if !cfg.AllowMsgTTL {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subject marker delete cannot be set if message TTLs are disabled"))
}
if cfg.SubjectDeleteMarkerTTL < time.Second {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subject marker delete TTL must be at least 1 second"))
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subject delete marker TTL must be at least 1 second"))
}
if !cfg.AllowMsgTTL {
if pedantic {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subject delete marker cannot be set if message TTLs are disabled"))
}
cfg.AllowMsgTTL = true
}
if !cfg.AllowRollup {
if pedantic {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subject delete marker cannot be set if roll-ups are disabled"))
}
cfg.AllowRollup, cfg.DenyPurge = true, false
}
} else if cfg.SubjectDeleteMarkerTTL < 0 {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subject marker delete TTL must not be negative"))
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subject delete marker TTL must not be negative"))
}
getStream := func(streamName string) (bool, StreamConfig) {
@@ -1666,16 +1677,19 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo
}
}
// Also check to make sure we do not overlap with our $JS API subjects.
if !cfg.NoAck && (subjectIsSubsetMatch(subj, "$JS.>") || subjectIsSubsetMatch(subj, "$JSC.>")) {
// We allow an exception for $JS.EVENT.> since these could have been created in the past.
if !subjectIsSubsetMatch(subj, "$JS.EVENT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with jetstream api require no-ack to be true"))
if !cfg.NoAck {
for _, namespace := range []string{"$JS.>", "$JSC.>", "$NRG.>"} {
if SubjectsCollide(subj, namespace) {
// We allow an exception for $JS.EVENT.> since these could have been created in the past.
if !subjectIsSubsetMatch(subj, "$JS.EVENT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with jetstream api require no-ack to be true"))
}
}
}
}
// And the $SYS subjects.
if !cfg.NoAck && subjectIsSubsetMatch(subj, "$SYS.>") {
if !subjectIsSubsetMatch(subj, "$SYS.ACCOUNT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with system api require no-ack to be true"))
if SubjectsCollide(subj, "$SYS.>") {
if !subjectIsSubsetMatch(subj, "$SYS.ACCOUNT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with system api require no-ack to be true"))
}
}
}
// Mark for duplicate check.
@@ -1830,12 +1844,12 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server, pedan
}
// Check on the allowed message TTL status.
if cfg.AllowMsgTTL != old.AllowMsgTTL {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("message TTL status can not be changed after stream creation"))
if old.AllowMsgTTL && !cfg.AllowMsgTTL {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("message TTL status can not be disabled"))
}
// Do some adjustments for being sealed.
// Pedantic mode will allow those changes to be made, as they are determinictic and important to get a sealed stream.
// Pedantic mode will allow those changes to be made, as they are deterministic and important to get a sealed stream.
if cfg.Sealed {
cfg.MaxAge = 0
cfg.Discard = DiscardNew
@@ -2223,7 +2237,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
mset.mu.RUnlock()
if preq != nil {
purged, err = mset.store.PurgeEx(preq.Subject, preq.Sequence, preq.Keep, false /*preq.NoMarkers*/)
purged, err = mset.store.PurgeEx(preq.Subject, preq.Sequence, preq.Keep)
} else {
purged, err = mset.store.Purge()
}
@@ -2265,7 +2279,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
// or consumer filter subject is subset of purged subject,
// but not the other way around.
o.isEqualOrSubsetMatch(preq.Subject)
// Check if a consumer has a wider subject space then what we purged
// Check if a consumer has a wider subject space than what we purged
var isWider bool
if !doPurge && preq != nil && o.isFilteredMatch(preq.Subject) {
doPurge, isWider = true, true
@@ -2834,7 +2848,14 @@ func (mset *stream) setupMirrorConsumer() error {
}
mirror.sfs = sfs
mirror.trs = trs
req.Config.FilterSubjects = sfs
// If there was no explicit FilterSubject defined and we have a single
// subject transform, use Config.FilterSubject instead of FilterSubjects
// so that we can use the extended consumer create API down below.
if req.Config.FilterSubject == _EMPTY_ && len(sfs) == 1 {
req.Config.FilterSubject = sfs[0]
} else {
req.Config.FilterSubjects = sfs
}
}
respCh := make(chan *JSApiConsumerCreateResponse, 1)
@@ -2860,8 +2881,6 @@ func (mset *stream) setupMirrorConsumer() error {
return nil
}
b, _ := json.Marshal(req)
var subject string
if req.Config.FilterSubject != _EMPTY_ {
req.Config.Name = fmt.Sprintf("mirror-%s", createConsumerName())
@@ -2874,6 +2893,9 @@ func (mset *stream) setupMirrorConsumer() error {
subject = strings.ReplaceAll(subject, "..", ".")
}
// Marshal now that we are done with `req`.
b, _ := json.Marshal(req)
// Reset
mirror.msgs = nil
mirror.err = nil
@@ -2956,7 +2978,7 @@ func (mset *stream) setupMirrorConsumer() error {
// Check to see if delivered is past our last and we have no msgs. This will help the
// case when mirroring a stream that has a very high starting sequence number.
if state.Msgs == 0 && ccr.ConsumerInfo.Delivered.Stream > state.LastSeq {
mset.store.PurgeEx(_EMPTY_, ccr.ConsumerInfo.Delivered.Stream+1, 0, true)
mset.store.PurgeEx(_EMPTY_, ccr.ConsumerInfo.Delivered.Stream+1, 0)
mset.lseq = ccr.ConsumerInfo.Delivered.Stream
} else {
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
@@ -4113,6 +4135,18 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
}
// This will fire the callback but we do not require the lock since md will be 0 here.
mset.store.RegisterStorageUpdates(mset.storeUpdates)
mset.store.RegisterStorageRemoveMsg(func(seq uint64) {
if mset.IsClustered() {
if mset.IsLeader() {
mset.mu.RLock()
md := streamMsgDelete{Seq: seq, NoErase: true, Stream: mset.cfg.Name}
mset.node.Propose(encodeMsgDelete(&md))
mset.mu.RUnlock()
}
} else {
mset.removeMsg(seq)
}
})
mset.store.RegisterSubjectDeleteMarkerUpdates(func(im *inMsg) {
if mset.IsClustered() {
if mset.IsLeader() {
@@ -5164,6 +5198,15 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
return err
}
// If subject delete markers are used, ensure message TTL is that at minimum.
// Otherwise, subject delete markers could be missed if one already exists for this subject.
// MaxMsgsPer=1 is an exception, because we'll only ever have one message.
if ttl > 0 && mset.cfg.SubjectDeleteMarkerTTL > 0 && mset.cfg.MaxMsgsPer != 1 {
if minTtl := int64(mset.cfg.SubjectDeleteMarkerTTL.Seconds()); ttl < minTtl {
ttl = minTtl
}
}
// Store actual msg.
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg, ttl)
@@ -5623,7 +5666,7 @@ func (mset *stream) resetAndWaitOnConsumers() {
for _, o := range consumers {
if node := o.raftNode(); node != nil {
node.StepDown()
node.Delete()
node.Stop()
}
if o.isMonitorRunning() {
o.monitorWg.Wait()
@@ -6441,7 +6484,6 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
if !fcfg.Created.IsZero() {
mset.setCreatedTime(fcfg.Created)
}
lseq := mset.lastSeq()
// Make sure we do an update if the configs have changed.
if !reflect.DeepEqual(fcfg.StreamConfig, cfg) {
@@ -6492,7 +6534,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
obs.setCreatedTime(cfg.Created)
}
obs.mu.Lock()
err = obs.readStoredState(lseq)
err = obs.readStoredState()
obs.mu.Unlock()
if err != nil {
mset.stop(true, false)

View File

@@ -1747,14 +1747,21 @@ func IntersectStree[T any](st *stree.SubjectTree[T], sl *Sublist, cb func(subj [
}
func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb func(subj []byte, entry *T)) {
if r.numNodes() == 0 {
// For wildcards we can't avoid Match, but if it's a literal subject at
// this point, using Find is considerably cheaper.
if subjectHasWildcard(bytesToString(subj)) {
st.Match(subj, cb)
} else if e, ok := st.Find(subj); ok {
// This level could potentially match literals, despite being followed up by
// additional wildcards. For literals we can use Find since it is considerably
// faster. Then we can carry on checking for further matches in the usual way.
wc := subjectHasWildcard(bytesToString(subj))
if !wc {
if e, ok := st.Find(subj); ok {
cb(subj, e)
}
}
if r.numNodes() == 0 {
// No further recursions to be made at this point but there's still a wildcard
// to match, so let the subject tree work it out.
if wc {
st.Match(subj, cb)
}
return
}
nsubj := subj

View File

@@ -1,4 +1,4 @@
// Copyright 2024 The NATS Authors
// Copyright 2024-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -149,8 +149,8 @@ func (hw *HashWheel) Update(seq uint64, oldExpires int64, newExpires int64) erro
return hw.Add(seq, newExpires)
}
// ExpireTasks processes all expired tasks using a callback.
func (hw *HashWheel) ExpireTasks(callback func(seq uint64, expires int64)) {
// ExpireTasks processes all expired tasks using a callback, but only expires a task if the callback returns true.
func (hw *HashWheel) ExpireTasks(callback func(seq uint64, expires int64) bool) {
now := time.Now().UnixNano()
// Quick return if nothing is expired.
@@ -179,12 +179,13 @@ func (hw *HashWheel) ExpireTasks(callback func(seq uint64, expires int64)) {
// Track new lowest while processing expirations
newLowest := int64(math.MaxInt64)
for seq, expires := range slot.entries {
if expires <= now {
callback(seq, expires)
if expires <= now && callback(seq, expires) {
delete(slot.entries, seq)
hw.count--
updateLowest = true
} else if expires < newLowest {
continue
}
if expires < newLowest {
newLowest = expires
}
}
@@ -207,6 +208,11 @@ func (hw *HashWheel) GetNextExpiration(before int64) int64 {
return math.MaxInt64
}
// Count returns the amount of tasks in the THW.
func (hw *HashWheel) Count() uint64 {
return hw.count
}
// AppendEncode writes out the contents of the THW into a binary snapshot
// and returns it. The high seq number is included in the snapshot and will
// be returned on decode.

6
vendor/modules.txt vendored
View File

@@ -989,10 +989,10 @@ github.com/mschoch/smat
# github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
## explicit
github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.7.3
## explicit; go 1.22
# github.com/nats-io/jwt/v2 v2.7.4
## explicit; go 1.23.0
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.11.1
# github.com/nats-io/nats-server/v2 v2.11.2
## explicit; go 1.23.0
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand