Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
6c9bef0040 build(deps): bump github.com/nats-io/nats-server/v2
Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.12.3 to 2.12.4.
- [Release notes](https://github.com/nats-io/nats-server/releases)
- [Commits](https://github.com/nats-io/nats-server/compare/v2.12.3...v2.12.4)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
  dependency-version: 2.12.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-01-28 14:45:43 +00:00
26 changed files with 900 additions and 636 deletions

View File

@@ -2230,7 +2230,7 @@ def genDocsPr(ctx):
"MY_TARGET_BRANCH": "${CI_COMMIT_BRANCH##stable-}",
},
"commands": [
'export DOC_GIT_TARGET_FOLDER="$$(if [ \"$$MY_TARGET_BRANCH\" = \"main\" ]; then echo \"tmpdocs/docs/dev/server/_static/env-vars/\"; else echo \"tmpdocs/versioned_docs/version-$${MY_TARGET_BRANCH}/dev/server/_static/env-vars/\"; fi)"',
'export DOC_GIT_TARGET_FOLDER="$$(if [ \"$$MY_TARGET_BRANCH\" = \"main\" ]; then echo \"tmpdocs/docs/dev/_static/env-vars/\"; else echo \"tmpdocs/versioned_docs/version-$${MY_TARGET_BRANCH}/dev/_static/env-vars/\"; fi)"',
'echo "$${CI_SSH_KEY}" > /root/id_rsa && chmod 600 /root/id_rsa',
'git config --global user.email "devops@opencloud.eu"',
'git config --global user.name "openclouders"',
@@ -2255,8 +2255,8 @@ def genDocsPr(ctx):
},
{
"event": "cron",
"branch": "main",
"cron": "nightly*",
"branch": "[main]",
"cron": "nightly *",
},
],
}]

View File

@@ -1,23 +1,5 @@
# Changelog
## [5.0.1](https://github.com/opencloud-eu/opencloud/releases/tag/v5.0.1) - 2026-01-28
### ❤️ Thanks to all contributors! ❤️
@ScharfViktor, @aduffeck, @saw-jan
### 🐛 Bug Fixes
- Do not ever set a TTL for the ID cache. It's not supposed to expire. [[#2223](https://github.com/opencloud-eu/opencloud/pull/2223)]
### ✅ Tests
- test(api): wait for web-office readiness by checking discovery endpoint [[#2217](https://github.com/opencloud-eu/opencloud/pull/2217)]
### 📦️ Dependencies
- reva-bump-2.42.1 [[#2225](https://github.com/opencloud-eu/opencloud/pull/2225)]
## [5.0.0](https://github.com/opencloud-eu/opencloud/releases/tag/v5.0.0) - 2026-01-26
### ❤️ Thanks to all contributors! ❤️

8
go.mod
View File

@@ -55,7 +55,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.12.3
github.com/nats-io/nats-server/v2 v2.12.4
github.com/nats-io/nats.go v1.48.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.1.3
@@ -65,7 +65,7 @@ require (
github.com/open-policy-agent/opa v1.12.3
github.com/opencloud-eu/icap-client v0.0.0-20250930132611-28a2afe62d89
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
github.com/opencloud-eu/reva/v2 v2.42.1
github.com/opencloud-eu/reva/v2 v2.42.0
github.com/opensearch-project/opensearch-go/v4 v4.6.0
github.com/orcaman/concurrent-map v1.0.0
github.com/pkg/errors v0.9.1
@@ -241,7 +241,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v1.9.3 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/go-tpm v0.9.7 // indirect
github.com/google/go-tpm v0.9.8 // indirect
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
github.com/google/renameio/v2 v2.0.1 // indirect
github.com/gookit/goutil v0.7.1 // indirect
@@ -261,7 +261,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/juliangruber/go-intersect v1.1.0 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/klauspost/compress v1.18.3 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/klauspost/crc32 v1.3.0 // indirect
github.com/kovidgoyal/go-parallel v1.1.1 // indirect

16
go.sum
View File

@@ -575,8 +575,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/go-tika v0.3.1 h1:l+jr10hDhZjcgxFRfcQChRLo1bPXQeLFluMyvDhXTTA=
github.com/google/go-tika v0.3.1/go.mod h1:DJh5N8qxXIl85QkqmXknd+PeeRkUOTbvwyYf7ieDz6c=
github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA=
github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo=
github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@@ -730,8 +730,8 @@ github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
@@ -916,8 +916,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.3 h1:KRv+1n7lddMVgkJPQer+pt36TcO0ENxjilBmeWdjcHs=
github.com/nats-io/nats-server/v2 v2.12.3/go.mod h1:MQXjG9WjyXKz9koWzUc3jYUMKD8x3CLmTNy91IQQz3Y=
github.com/nats-io/nats-server/v2 v2.12.4 h1:ZnT10v2LU2Xcoiy8ek9X6Se4YG8EuMfIfvAEuFVx1Ts=
github.com/nats-io/nats-server/v2 v2.12.4/go.mod h1:5MCp/pqm5SEfsvVZ31ll1088ZTwEUdvRX1Hmh/mTTDg=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
@@ -969,8 +969,8 @@ github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9 h1:dIft
github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9/go.mod h1:JWyDC6H+5oZRdUJUgKuaye+8Ph5hEs6HVzVoPKzWSGI=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76 h1:vD/EdfDUrv4omSFjrinT8Mvf+8D7f9g4vgQ2oiDrVUI=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q=
github.com/opencloud-eu/reva/v2 v2.42.1 h1:QUZOLSfAhb7bw+qsVSFMFY644rUz4/NtnOiJ0QQxj2o=
github.com/opencloud-eu/reva/v2 v2.42.1/go.mod h1:pv+w23JG0/qJweZbTzNNev//YEvlUML1L/2iXgKGkkg=
github.com/opencloud-eu/reva/v2 v2.42.0 h1:CWlXbNqUSduQ5Afi6XoegoJ/zyV0Vx5UoPKAZZmEAq4=
github.com/opencloud-eu/reva/v2 v2.42.0/go.mod h1:pv+w23JG0/qJweZbTzNNev//YEvlUML1L/2iXgKGkkg=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=

View File

@@ -34,7 +34,7 @@ var (
// LatestTag is the latest released version plus the dev meta version.
// Will be overwritten by the release pipeline
// Needs a manual change for every tagged release
LatestTag = "5.0.1+dev"
LatestTag = "5.0.0+dev"
// Date indicates the build date.
// This has been removed, it looks like you can only replace static strings with recent go versions

View File

@@ -60,6 +60,9 @@ type parser struct {
// pedantic reports error when configuration is not correct.
pedantic bool
// Tracks environment variable references, to avoid cycles
envVarReferences map[string]bool
}
// Parse will return a map of keys to any, although concrete types
@@ -180,16 +183,37 @@ func (t *token) Position() int {
return t.item.pos
}
func parse(data, fp string, pedantic bool) (p *parser, err error) {
p = &parser{
mapping: make(map[string]any),
lx: lex(data),
ctxs: make([]any, 0, 4),
keys: make([]string, 0, 4),
ikeys: make([]item, 0, 4),
fp: filepath.Dir(fp),
pedantic: pedantic,
func newParser(data, fp string, pedantic bool) *parser {
return &parser{
mapping: make(map[string]any),
lx: lex(data),
ctxs: make([]any, 0, 4),
keys: make([]string, 0, 4),
ikeys: make([]item, 0, 4),
fp: filepath.Dir(fp),
pedantic: pedantic,
envVarReferences: make(map[string]bool),
}
}
func parse(data, fp string, pedantic bool) (*parser, error) {
p := newParser(data, fp, pedantic)
if err := p.parse(fp); err != nil {
return nil, err
}
return p, nil
}
func parseEnv(data string, parent *parser) (*parser, error) {
p := newParser(data, "", false)
p.envVarReferences = parent.envVarReferences
if err := p.parse(""); err != nil {
return nil, err
}
return p, nil
}
func (p *parser) parse(fp string) error {
p.pushContext(p.mapping)
var prevItem item
@@ -199,16 +223,16 @@ func parse(data, fp string, pedantic bool) (p *parser, err error) {
// Here we allow the final character to be a bracket '}'
// in order to support JSON like configurations.
if prevItem.typ == itemKey && prevItem.val != mapEndString {
return nil, fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos)
return fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos)
}
break
}
prevItem = it
if err := p.processItem(it, fp); err != nil {
return nil, err
return err
}
}
return p, nil
return nil
}
func (p *parser) next() item {
@@ -453,11 +477,18 @@ func (p *parser) lookupVariable(varReference string) (any, bool, error) {
}
// If we are here, we have exhausted our context maps and still not found anything.
// Parse from the environment.
// Detect reference cycles
if p.envVarReferences[varReference] {
return nil, false, fmt.Errorf("variable reference cycle for '%s'", varReference)
}
p.envVarReferences[varReference] = true
defer delete(p.envVarReferences, varReference)
// Parse from the environment
if vStr, ok := os.LookupEnv(varReference); ok {
// Everything we get here will be a string value, so we need to process as a parser would.
if vmap, err := Parse(fmt.Sprintf("%s=%s", pkey, vStr)); err == nil {
v, ok := vmap[pkey]
if subp, err := parseEnv(fmt.Sprintf("%s=%s", pkey, vStr), p); err == nil {
v, ok := subp.mapping[pkey]
return v, ok, nil
} else {
return nil, false, err

View File

@@ -299,6 +299,7 @@ func (a *Account) shallowCopy(na *Account) {
na.Nkey = a.Nkey
na.Issuer = a.Issuer
na.traceDest, na.traceDestSampling = a.traceDest, a.traceDestSampling
na.nrgAccount = a.nrgAccount
if a.imports.streams != nil {
na.imports.streams = make([]*streamImport, 0, len(a.imports.streams))

View File

@@ -1,4 +1,4 @@
// Copyright 2023-2024 The NATS Authors
// Copyright 2023-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2025 The NATS Authors
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand"
"net"
"net/http"
@@ -35,8 +36,6 @@ import (
"sync/atomic"
"time"
"slices"
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats-server/v2/internal/fastrand"
@@ -2732,9 +2731,12 @@ func (c *client) updateS2AutoCompressionLevel(co *CompressionOpts, compression *
}
// Will return the parts from the raw wire msg.
// We return the `hdr` as a slice that is capped to the length of the headers
// so that if the caller later tries to append to the returned header slice it
// does not affect the message content.
func (c *client) msgParts(data []byte) (hdr []byte, msg []byte) {
if c != nil && c.pa.hdr > 0 {
return data[:c.pa.hdr], data[c.pa.hdr:]
return data[:c.pa.hdr:c.pa.hdr], data[c.pa.hdr:]
}
return nil, data
}
@@ -3337,7 +3339,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
sub.shadow = nil
if len(shadowSubs) > 0 {
isSpokeLeaf = c.isSpokeLeafNode()
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF || c.kind == JETSTREAM) && c.srv != nil
}
sub.close()
c.mu.Unlock()
@@ -4565,6 +4567,19 @@ func getHeaderKeyIndex(key string, hdr []byte) int {
}
}
// setHeader will replace the value of the first existing key `key`
// with the given value `val`, or add this new key at the end of
// the headers.
//
// Note: If the key does not exist, or if it exists but the new value
// would make the resulting byte slice larger than the original one,
// a new byte slice is returned and the original is left untouched.
// This is to prevent situations where caller may have a `hdr` and
// `msg` that are the parts of an underlying buffer. Extending the
// `hdr` would otherwise overwrite the `msg` part.
//
// If the new value is smaller, then the original `hdr` byte slice
// is modified.
func setHeader(key, val string, hdr []byte) []byte {
start := getHeaderKeyIndex(key, hdr)
if start >= 0 {
@@ -4579,15 +4594,45 @@ func setHeader(key, val string, hdr []byte) []byte {
return hdr // malformed headers
}
valEnd += valStart
suffix := slices.Clone(hdr[valEnd:])
newHdr := append(hdr[:valStart], val...)
return append(newHdr, suffix...)
// Length of the existing value (before the `\r`)
oldValLen := valEnd - valStart
// This is how many extra bytes we need for the new value.
// If <= 0, it means that we need less and so will reuse the `hdr` buffer.
if extra := len(val) - oldValLen; extra > 0 {
// Check that we don't overflow an "int".
if rem := math.MaxInt - hdrLen; rem < extra {
// We don't grow, and return the existing header.
return hdr
}
// The new size is the old size plus the extra bytes.
newHdrSize := hdrLen + extra
newHdr := make([]byte, newHdrSize)
// Copy the parts from `hdr` and `val` into the new buffer.
n := copy(newHdr, hdr[:valStart])
n += copy(newHdr[n:], val)
copy(newHdr[n:], hdr[valEnd:])
return newHdr
}
// We can write in place since it fits in the existing `hdr` buffer.
n := copy(hdr[valStart:], val)
n += copy(hdr[valStart+n:], hdr[valEnd:])
hdr = hdr[:valStart+n]
return hdr
}
if len(hdr) > 0 && bytes.HasSuffix(hdr, []byte("\r\n")) {
hdr = hdr[:len(hdr)-2]
val += "\r\n"
}
return fmt.Appendf(hdr, "%s: %s\r\n", key, val)
// Create the new buffer based on length of existing one and
// length of the new "<key>: <value>\r\n". Protect against "int" overflow.
newSize := uint64(len(hdr)) + uint64(len(key)) + 1 + 1 + uint64(len(val)) + 2
if newSize > uint64(math.MaxInt) {
// We don't grow, and return the existing header.
return hdr
}
newHdr := make([]byte, 0, int(newSize))
newHdr = append(newHdr, hdr...)
return fmt.Appendf(newHdr, "%s: %s\r\n", key, val)
}
// For bytes.HasPrefix below.

View File

@@ -66,7 +66,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.12.3"
VERSION = "2.12.4"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2025 The NATS Authors
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1030,11 +1030,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
if cName != _EMPTY_ {
if eo, ok := mset.consumers[cName]; ok {
mset.mu.Unlock()
if action == ActionCreate {
ocfg := eo.config()
copyConsumerMetadata(config, &ocfg)
if !reflect.DeepEqual(config, &ocfg) {
mset.mu.Unlock()
return nil, NewJSConsumerAlreadyExistsError()
}
}
@@ -1042,9 +1042,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if cfg.Retention == WorkQueuePolicy {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if !mset.partitionUnique(cName, subjects) {
mset.mu.Unlock()
return nil, NewJSConsumerWQConsumerNotUniqueError()
}
}
mset.mu.Unlock()
err := eo.updateConfig(config)
if err == nil {
return eo, nil
@@ -1542,7 +1544,6 @@ func (o *consumer) setLeader(isLeader bool) {
if o.cfg.AckPolicy != AckNone {
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
}
}
@@ -1551,7 +1552,6 @@ func (o *consumer) setLeader(isLeader bool) {
// Will error if wrong mode to provide feedback to users.
if o.reqSub, err = o.subscribeInternal(o.nextMsgSubj, o.processNextMsgReq); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
}
@@ -1561,7 +1561,6 @@ func (o *consumer) setLeader(isLeader bool) {
fcsubj := fmt.Sprintf(jsFlowControl, stream, o.name)
if o.fcSub, err = o.subscribeInternal(fcsubj, o.processFlowControl); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
}
}
@@ -2401,7 +2400,8 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// Check for Subject Filters update.
newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects)
if !subjectSliceEqual(newSubjects, o.subjf.subjects()) {
updatedFilters := !subjectSliceEqual(newSubjects, o.subjf.subjects())
if updatedFilters {
newSubjf := make(subjectFilters, 0, len(newSubjects))
for _, newFilter := range newSubjects {
fs := &subjectFilter{
@@ -2440,15 +2440,17 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// Allowed but considered no-op, [Description, SampleFrequency, MaxWaiting, HeadersOnly]
o.cfg = *cfg
// Cleanup messages that lost interest.
if o.retention == InterestPolicy {
o.mu.Unlock()
o.cleanupNoInterestMessages(o.mset, false)
o.mu.Lock()
}
if updatedFilters {
// Cleanup messages that lost interest.
if o.retention == InterestPolicy {
o.mu.Unlock()
o.cleanupNoInterestMessages(o.mset, false)
o.mu.Lock()
}
// Re-calculate num pending on update.
o.streamNumPending()
// Re-calculate num pending on update.
o.streamNumPending()
}
return nil
}
@@ -5115,9 +5117,14 @@ func (o *consumer) checkNumPending() (uint64, error) {
var state StreamState
o.mset.store.FastState(&state)
npc := o.numPending()
if o.sseq > state.LastSeq && npc > 0 || npc > state.Msgs {
// Re-calculate.
return o.streamNumPending()
// Make sure we can't report more messages than there are.
// TODO(nat): It's not great that this means consumer info has side effects,
// since we can't know whether anyone will call it or not. The previous num
// pending calculation that this replaces had the same problem though.
if o.sseq > state.LastSeq {
o.npc = 0
} else if npc > 0 {
o.npc = int64(min(npc, state.Msgs, state.LastSeq-o.sseq+1))
}
}
return o.numPending(), nil
@@ -5365,6 +5372,15 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
o.pending = make(map[uint64]*Pending)
}
now := time.Now()
if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
p.Timestamp = now.UnixNano()
} else {
o.pending[sseq] = &Pending{dseq, now.UnixNano()}
}
// We could have a backoff that set a timer higher than what we need for this message.
// In that case, reset to lowest backoff required for a message redelivery.
minDelay := o.ackWait(0)
@@ -5377,18 +5393,10 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
}
minDelay = o.ackWait(o.cfg.BackOff[bi])
}
minDeadline := time.Now().Add(minDelay)
minDeadline := now.Add(minDelay)
if o.ptmr == nil || o.ptmrEnd.After(minDeadline) {
o.resetPtmr(minDelay)
}
if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
p.Timestamp = time.Now().UnixNano()
} else {
o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()}
}
}
// Credit back a failed delivery.
@@ -6503,6 +6511,10 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
retryAsflr = seq
}
} else if seq <= dflr {
// Store the first entry above our ack floor, so we don't need to look it up again on retryAsflr=0.
if retryAsflr == 0 {
retryAsflr = seq
}
// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
if _, ok := state.Pending[seq]; !ok {
// The filters are already taken into account,
@@ -6514,8 +6526,18 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
}
}
// If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it.
// However, our ack floor may be lower than the next message we can receive, so we correct it upward if needed.
if retryAsflr == 0 {
retryAsflr = asflr + 1
if filters != nil {
_, nseq, err = store.LoadNextMsgMulti(filters, asflr+1, &smv)
} else {
_, nseq, err = store.LoadNextMsg(filter, wc, asflr+1, &smv)
}
if err == nil {
retryAsflr = max(asflr+1, nseq)
} else if err == ErrStoreEOF {
retryAsflr = ss.LastSeq + 1
}
}
o.mu.Lock()

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2025 The NATS Authors
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -201,7 +201,7 @@ type fileStore struct {
sips int
dirty int
closing bool
closed bool
closed atomic.Bool // Atomic to reduce contention on ConsumerStores.
fip bool
receivedAny bool
firstMoved bool
@@ -473,10 +473,18 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
_, err = os.Stat(keyFile)
// Either the file should exist (err=nil), or it shouldn't. Any other error is reported.
if err != nil && !os.IsNotExist(err) {
return nil, err
}
// Make sure we do not have an encrypted store underneath of us but no main key.
if fs.prf == nil {
if _, err := os.Stat(keyFile); err == nil {
return nil, errNoMainKey
if fs.prf == nil && err == nil {
return nil, errNoMainKey
} else if fs.prf != nil && err == nil {
// If encryption is configured and the key file exists, recover our keys.
if err = fs.recoverAEK(); err != nil {
return nil, err
}
}
@@ -1784,16 +1792,14 @@ func (fs *fileStore) recoverFullState() (rerr error) {
}
// Decrypt if needed.
if fs.prf != nil {
// We can be setup for encryption but if this is a snapshot restore we will be missing the keyfile
// since snapshots strip encryption.
if err := fs.recoverAEK(); err == nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil)
if err != nil {
fs.warn("Stream state error reading encryption key: %v", err)
return err
}
// We can be setup for encryption but if this is a snapshot restore we will be missing the keyfile
// since snapshots strip encryption.
if fs.prf != nil && fs.aek != nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil)
if err != nil {
fs.warn("Stream state error reading encryption key: %v", err)
return err
}
}
@@ -2346,7 +2352,7 @@ func (fs *fileStore) recoverMsgs() error {
if fs.ld != nil {
var emptyBlks []*msgBlock
for _, mb := range fs.blks {
if mb.msgs == 0 && mb.rbytes == 0 {
if mb.msgs == 0 && mb.rbytes == 0 && mb != fs.lmb {
emptyBlks = append(emptyBlks, mb)
}
}
@@ -2588,7 +2594,7 @@ func copyMsgBlocks(src []*msgBlock) []*msgBlock {
func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
fs.mu.RLock()
lastSeq := fs.state.LastSeq
closed := fs.closed
closed := fs.isClosed()
fs.mu.RUnlock()
if closed {
@@ -2603,20 +2609,51 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
fseq := atomic.LoadUint64(&mb.first.seq)
lseq := atomic.LoadUint64(&mb.last.seq)
var smv StoreMsg
var (
smv StoreMsg
cts int64
cseq uint64
off uint64
)
ts := t.UnixNano()
// Because sort.Search expects range [0,off), we have to manually
// calculate the offset from the first sequence.
off := int(lseq - fseq + 1)
i := sort.Search(off, func(i int) bool {
sm, _, _ := mb.fetchMsgNoCopy(fseq+uint64(i), &smv)
return sm != nil && sm.ts >= ts
})
if i < off {
return fseq + uint64(i)
// Using a binary search, but need to be aware of interior deletes in the block.
seq := lseq + 1
loop:
for fseq <= lseq {
mid := fseq + (lseq-fseq)/2
off = 0
// Potentially skip over gaps. We keep the original middle but keep track of a
// potential delete range with an offset.
for {
sm, _, err := mb.fetchMsgNoCopy(mid+off, &smv)
if err != nil || sm == nil {
off++
if mid+off <= lseq {
continue
} else {
// Continue search to the left. Purposely ignore the skipped deletes here.
lseq = mid - 1
continue loop
}
}
cts = sm.ts
cseq = sm.seq
break
}
if cts >= ts {
seq = cseq
if mid == fseq {
break
}
// Continue search to the left.
lseq = mid - 1
} else {
// Continue search to the right (potentially skipping over interior deletes).
fseq = mid + off + 1
}
}
return 0
return seq
}
// Find the first matching message against a sublist.
@@ -2632,13 +2669,15 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
mb.mu.Unlock()
}()
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if mb.fssNotLoaded() {
// Make sure we have fss loaded.
if err := mb.loadMsgsWithLock(); err != nil {
return nil, false, err
}
didLoad = true
}
// Mark fss activity.
mb.lsts = ats.AccessTime()
// Make sure to start at mb.first.seq if fseq < mb.first.seq
if seq := atomic.LoadUint64(&mb.first.seq); seq > start {
@@ -2646,10 +2685,6 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
}
lseq := atomic.LoadUint64(&mb.last.seq)
if sm == nil {
sm = new(StoreMsg)
}
// If the FSS state has fewer entries than sequences in the linear scan,
// then use intersection instead as likely going to be cheaper. This will
// often be the case with high numbers of deletes, as well as a smaller
@@ -2657,7 +2692,11 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
if uint64(mb.fss.Size()) < lseq-start {
// If there are no subject matches then this is effectively no-op.
hseq := uint64(math.MaxUint64)
gsl.IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) {
var ierr error
stree.IntersectGSL(mb.fss, sl, func(subj []byte, ss *SimpleState) {
if ierr != nil {
return
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
// mb is already loaded into the cache so should be fast-ish.
mb.recalculateForSubj(bytesToString(subj), ss)
@@ -2669,6 +2708,16 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
// than our first seq for this subject.
return
}
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if ierr = mb.loadMsgsWithLock(); ierr != nil {
return
}
didLoad = true
}
if sm == nil {
sm = new(StoreMsg)
}
if first == ss.First {
// If the start floor is below where this subject starts then we can
// short-circuit, avoiding needing to scan for the next message.
@@ -2703,10 +2752,24 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
mb.llseq = llseq
}
})
if ierr != nil {
return nil, false, ierr
}
if hseq < uint64(math.MaxUint64) && sm != nil {
return sm, didLoad && start == lseq, nil
}
} else {
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil, false, err
}
didLoad = true
}
if sm == nil {
sm = new(StoreMsg)
}
for seq := start; seq <= lseq; seq++ {
if mb.dmap.Exists(seq) {
// Optimisation to avoid calling cacheLookup which hits time.Now().
@@ -2773,6 +2836,10 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
isAll = true
})
}
// If the only subject in this block isn't our filter, can simply short-circuit.
if !isAll {
return nil, didLoad, ErrStoreMsgNotFound
}
}
// Make sure to start at mb.first.seq if fseq < mb.first.seq
fseq = max(fseq, atomic.LoadUint64(&mb.first.seq))
@@ -2909,7 +2976,7 @@ func (mb *msgBlock) prevMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *S
if uint64(mb.fss.Size()) < start-lseq {
// If there are no subject matches then this is effectively no-op.
hseq := uint64(0)
gsl.IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) {
stree.IntersectGSL(mb.fss, sl, func(subj []byte, ss *SimpleState) {
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
// mb is already loaded into the cache so should be fast-ish.
mb.recalculateForSubj(bytesToString(subj), ss)
@@ -3188,6 +3255,30 @@ func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool, bi int) (int, e
if start == uint32(math.MaxUint32) {
return -1, ErrStoreEOF
}
return fs.selectSkipFirstBlock(bi, start, stop)
}
// This is used to see if we can selectively jump start blocks based on filter subjects and a starting block index.
// Will return -1 and ErrStoreEOF if no matches at all or no more from where we are.
func (fs *fileStore) checkSkipFirstBlockMulti(sl *gsl.SimpleSublist, bi int) (int, error) {
// Move through psim to gather start and stop bounds.
start, stop := uint32(math.MaxUint32), uint32(0)
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
})
// Nothing was found.
if start == uint32(math.MaxUint32) {
return -1, ErrStoreEOF
}
return fs.selectSkipFirstBlock(bi, start, stop)
}
func (fs *fileStore) selectSkipFirstBlock(bi int, start, stop uint32) (int, error) {
// Can not be nil so ok to inline dereference.
mbi := fs.blks[bi].getIndex()
// All matching msgs are behind us.
@@ -4006,7 +4097,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
mb := fs.blks[seqStart]
bi := mb.index
gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) {
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
// If the select blk start is greater than entry's last blk skip.
if bi > psi.lblk {
return
@@ -4105,7 +4196,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
var t uint64
var havePartial bool
var updateLLTS bool
gsl.IntersectStree[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
stree.IntersectGSL[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
subj := bytesToString(bsubj)
if havePartial {
// If we already found a partial then don't do anything else.
@@ -4168,7 +4259,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
// If we are here it's better to calculate totals from psim and adjust downward by scanning less blocks.
start := uint32(math.MaxUint32)
gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) {
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
total += psi.total
// Keep track of start index for this subject.
if psi.fblk < start {
@@ -4219,7 +4310,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
}
// Mark fss activity.
mb.lsts = ats.AccessTime()
gsl.IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
stree.IntersectGSL(mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
adjust += ss.Msgs
})
}
@@ -4486,7 +4577,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
// Stores a raw message with expected sequence number and timestamp.
// Lock should be held.
func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, ttl int64) (err error) {
if fs.closed {
if fs.isClosed() {
return ErrStoreClosed
}
@@ -5159,7 +5250,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
fsLock()
if fs.closed {
if fs.isClosed() {
fsUnlock()
return false, ErrStoreClosed
}
@@ -5369,14 +5460,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// If we have a callback registered we need to release lock regardless since cb might need it to lookup msg, etc.
fs.mu.Unlock()
// Storage updates.
if cb != nil {
var subj string
if sm != nil {
subj = sm.subj
}
delta := int64(msz)
cb(-1, -delta, seq, subj)
}
delta := int64(msz)
cb(-1, -delta, seq, sm.subj)
if !needFSLock {
fs.mu.Lock()
@@ -5611,10 +5696,7 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
}
func (fs *fileStore) isClosed() bool {
fs.mu.RLock()
closed := fs.closed
fs.mu.RUnlock()
return closed
return fs.closed.Load()
}
// Will spin up our flush loop.
@@ -6995,11 +7077,10 @@ func (mb *msgBlock) ensureRawBytesLoaded() error {
// Sync msg and index files as needed. This is called from a timer.
func (fs *fileStore) syncBlocks() {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
if fs.isClosed() {
return
}
fs.mu.Lock()
blks := append([]*msgBlock(nil), fs.blks...)
lmb, firstMoved, firstSeq := fs.lmb, fs.firstMoved, fs.state.FirstSeq
// Clear first moved.
@@ -7089,11 +7170,10 @@ func (fs *fileStore) syncBlocks() {
}
}
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
if fs.isClosed() {
return
}
fs.mu.Lock()
fs.setSyncTimer()
if markDirty {
fs.dirty++
@@ -7939,17 +8019,14 @@ func (fs *fileStore) msgForSeq(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
// Will return message for the given sequence number.
func (fs *fileStore) msgForSeqLocked(seq uint64, sm *StoreMsg, needFSLock bool) (*StoreMsg, error) {
if fs.isClosed() {
return nil, ErrStoreClosed
}
// TODO(dlc) - Since Store, Remove, Skip all hold the write lock on fs this will
// be stalled. Need another lock if want to happen in parallel.
if needFSLock {
fs.mu.RLock()
}
if fs.closed {
if needFSLock {
fs.mu.RUnlock()
}
return nil, ErrStoreClosed
}
// Indicates we want first msg.
if seq == 0 {
seq = fs.state.FirstSeq
@@ -8128,10 +8205,14 @@ func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
// loadLast will load the last message for a subject. Subject should be non empty and not ">".
func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err error) {
if fs.isClosed() {
return nil, ErrStoreClosed
}
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.closed || fs.lmb == nil {
if fs.lmb == nil {
return nil, ErrStoreClosed
}
@@ -8228,15 +8309,15 @@ func (fs *fileStore) LoadLastMsg(subject string, smv *StoreMsg) (sm *StoreMsg, e
// LoadNextMsgMulti will find the next message matching any entry in the sublist.
func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
if fs.isClosed() {
return nil, 0, ErrStoreClosed
}
if sl == nil {
return fs.LoadNextMsg(_EMPTY_, false, start, smp)
}
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.closed {
return nil, 0, ErrStoreClosed
}
if fs.state.Msgs == 0 || start > fs.state.LastSeq {
return nil, fs.state.LastSeq, ErrStoreEOF
}
@@ -8244,6 +8325,31 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
start = fs.state.FirstSeq
}
// If start is less than or equal to beginning of our stream, meaning our first call,
// let's check the psim to see if we can skip ahead.
if start <= fs.state.FirstSeq {
var total uint64
blkStart := uint32(math.MaxUint32)
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
total += psi.total
// Keep track of start index for this subject.
if psi.fblk < blkStart {
blkStart = psi.fblk
}
})
// Nothing available.
if total == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// We can skip ahead.
if mb := fs.bim[blkStart]; mb != nil {
fseq := atomic.LoadUint64(&mb.first.seq)
if fseq > start {
start = fseq
}
}
}
if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
for i := bi; i < len(fs.blks); i++ {
mb := fs.blks[i]
@@ -8254,8 +8360,28 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
} else if expireOk {
mb.tryForceExpireCache()
} else {
// Nothing found in this block. We missed, if first block (bi) check psim.
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
// We should not do this at all if we are already on the last block.
if i == bi && i < len(fs.blks)-1 {
nbi, err := fs.checkSkipFirstBlockMulti(sl, bi)
// Nothing available.
if err == ErrStoreEOF {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi > i {
i = nbi - 1 // For the iterator condition i++
}
}
// Check if we can expire.
if expireOk {
mb.tryForceExpireCache()
}
}
}
}
@@ -8265,12 +8391,13 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
}
func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, uint64, error) {
if fs.isClosed() {
return nil, 0, ErrStoreClosed
}
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.closed {
return nil, 0, ErrStoreClosed
}
if fs.state.Msgs == 0 || start > fs.state.LastSeq {
return nil, fs.state.LastSeq, ErrStoreEOF
}
@@ -8323,7 +8450,7 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
i = nbi - 1 // For the iterator condition i++
}
}
// Check is we can expire.
// Check if we can expire.
if expireOk {
mb.tryForceExpireCache()
}
@@ -8336,12 +8463,13 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Will load the next non-deleted msg starting at the start sequence and walking backwards.
func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) {
if fs.isClosed() {
return nil, ErrStoreClosed
}
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.closed {
return nil, ErrStoreClosed
}
if fs.state.Msgs == 0 || start < fs.state.FirstSeq {
return nil, ErrStoreEOF
}
@@ -8389,6 +8517,10 @@ func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err
// LoadPrevMsgMulti will find the previous message matching any entry in the sublist.
func (fs *fileStore) LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
if fs.isClosed() {
return nil, 0, ErrStoreClosed
}
if sl == nil {
sm, err = fs.LoadPrevMsg(start, smp)
return
@@ -8396,9 +8528,6 @@ func (fs *fileStore) LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.closed {
return nil, 0, ErrStoreClosed
}
if fs.state.Msgs == 0 || start < fs.state.FirstSeq {
return nil, fs.state.FirstSeq, ErrStoreEOF
}
@@ -8952,12 +9081,12 @@ func (fs *fileStore) Purge() (uint64, error) {
}
func (fs *fileStore) purge(fseq uint64) (uint64, error) {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
if fs.isClosed() {
return 0, ErrStoreClosed
}
fs.mu.Lock()
purged := fs.state.Msgs
rbytes := int64(fs.state.Bytes)
@@ -9065,6 +9194,11 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) {
fs.mu.Unlock()
return fs.purge(seq)
}
// Short-circuit if the store was already compacted past this point.
if fs.state.FirstSeq > seq {
fs.mu.Unlock()
return purged, nil
}
// We have to delete interior messages.
smb := fs.selectMsgBlock(seq)
if smb == nil {
@@ -9112,7 +9246,12 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) {
if err = smb.loadMsgsWithLock(); err != nil {
goto SKIP
}
defer smb.finishedWithCache()
defer func() {
// The lock is released once we get here, so need to re-acquire.
smb.mu.Lock()
smb.finishedWithCache()
smb.mu.Unlock()
}()
}
for mseq := atomic.LoadUint64(&smb.first.seq); mseq < seq; mseq++ {
sm, err := smb.cacheLookupNoCopy(mseq, &smv)
@@ -9290,12 +9429,12 @@ SKIP:
// Will completely reset our store.
func (fs *fileStore) reset() error {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
if fs.isClosed() {
return ErrStoreClosed
}
fs.mu.Lock()
var purged, bytes uint64
cb := fs.scb
@@ -9381,6 +9520,10 @@ func (mb *msgBlock) tombsLocked() []msgId {
// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
func (fs *fileStore) Truncate(seq uint64) error {
if fs.isClosed() {
return ErrStoreClosed
}
// Check for request to reset.
if seq == 0 {
return fs.reset()
@@ -9388,11 +9531,6 @@ func (fs *fileStore) Truncate(seq uint64) error {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
return ErrStoreClosed
}
// Any existing state file will no longer be applicable. We will force write a new one
// at the end, after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
@@ -9682,6 +9820,14 @@ func (fs *fileStore) purgeMsgBlock(mb *msgBlock) {
mb.finishedWithCache()
mb.mu.Unlock()
fs.selectNextFirst()
if cb := fs.scb; cb != nil {
// If we have a callback registered, we need to release lock regardless since consumers will recalculate pending.
fs.mu.Unlock()
// Storage updates.
cb(-int64(msgs), -int64(bytes), 0, _EMPTY_)
fs.mu.Lock()
}
}
// Called by purge to simply get rid of the cache and close our fds.
@@ -9893,6 +10039,9 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
func (fs *fileStore) resetGlobalPerSubjectInfo() {
// Clear any global subject state.
fs.psim, fs.tsl = fs.psim.Empty(), 0
if fs.noTrackSubjects() {
return
}
for _, mb := range fs.blks {
fs.populateGlobalPerSubjectInfo(mb)
}
@@ -10230,6 +10379,10 @@ func (fs *fileStore) forceWriteFullStateLocked() error {
// 3. MBs - Index, Bytes, First and Last Sequence and Timestamps, and the deleted map (avl.seqset).
// 4. Last block index and hash of record inclusive to this stream state.
func (fs *fileStore) _writeFullState(force, needLock bool) error {
if fs.isClosed() {
return nil
}
fsLock := func() {
if needLock {
fs.mu.Lock()
@@ -10243,7 +10396,7 @@ func (fs *fileStore) _writeFullState(force, needLock bool) error {
start := time.Now()
fsLock()
if fs.closed || fs.dirty == 0 {
if fs.dirty == 0 {
fsUnlock()
return nil
}
@@ -10494,8 +10647,12 @@ func (fs *fileStore) Stop() error {
// Stop the current filestore.
func (fs *fileStore) stop(delete, writeState bool) error {
if fs.isClosed() {
return ErrStoreClosed
}
fs.mu.Lock()
if fs.closed || fs.closing {
if fs.closing {
fs.mu.Unlock()
return ErrStoreClosed
}
@@ -10531,7 +10688,7 @@ func (fs *fileStore) stop(delete, writeState bool) error {
// Mark as closed. Last message block needs to be cleared after
// writeFullState has completed.
fs.closed = true
fs.closed.Store(true)
fs.lmb = nil
// We should update the upper usage layer on a stop.
@@ -10750,11 +10907,12 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool, err
// Create a snapshot of this stream and its consumer's state along with messages.
func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumers bool) (*SnapshotResult, error) {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
if fs.isClosed() {
return nil, ErrStoreClosed
}
fs.mu.Lock()
// Only allow one at a time.
if fs.sips > 0 {
fs.mu.Unlock()
@@ -11074,7 +11232,9 @@ func (fs *fileStore) ConsumerStore(name string, created time.Time, cfg *Consumer
go o.flushLoop(o.fch, o.qch)
// Make sure to load in our state from disk if needed.
o.loadState()
if err = o.loadState(); err != nil {
return nil, err
}
// Assign to filestore.
fs.AddConsumer(o)
@@ -11764,10 +11924,15 @@ func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, er
}
// Lock should be held. Called at startup.
func (o *consumerFileStore) loadState() {
func (o *consumerFileStore) loadState() error {
if _, err := os.Stat(o.ifn); err == nil {
// This will load our state in from disk.
o.stateWithCopyLocked(false)
_, err = o.stateWithCopyLocked(false)
return err
} else if os.IsNotExist(err) {
return nil
} else {
return err
}
}

View File

@@ -17,9 +17,6 @@ import (
"errors"
"strings"
"sync"
"unsafe"
"github.com/nats-io/nats-server/v2/server/stree"
)
// Sublist is a routing mechanism to handle subject distribution and
@@ -251,7 +248,9 @@ func matchLevelForAny[T comparable](l *level[T], toks []string, np *int) bool {
if np != nil {
*np += len(n.subs)
}
return len(n.subs) > 0
if len(n.subs) > 0 {
return true
}
}
if pwc != nil {
if np != nil {
@@ -370,6 +369,36 @@ func (s *GenericSublist[T]) Remove(subject string, value T) error {
return s.remove(subject, value, true)
}
// HasInterestStartingIn is a helper for subject tree intersection.
func (s *GenericSublist[T]) HasInterestStartingIn(subj string) bool {
s.RLock()
defer s.RUnlock()
var _tokens [64]string
tokens := tokenizeSubjectIntoSlice(_tokens[:0], subj)
return hasInterestStartingIn(s.root, tokens)
}
func hasInterestStartingIn[T comparable](l *level[T], tokens []string) bool {
if l == nil {
return false
}
if len(tokens) == 0 {
return true
}
token := tokens[0]
if l.fwc != nil {
return true
}
found := false
if pwc := l.pwc; pwc != nil {
found = found || hasInterestStartingIn(pwc.next, tokens[1:])
}
if n := l.nodes[token]; n != nil {
found = found || hasInterestStartingIn(n.next, tokens[1:])
}
return found
}
// pruneNode is used to prune an empty node from the tree.
func (l *level[T]) pruneNode(n *node[T], t string) {
if n == nil {
@@ -463,86 +492,15 @@ func visitLevel[T comparable](l *level[T], depth int) int {
return maxDepth
}
// IntersectStree will match all items in the given subject tree that
// have interest expressed in the given sublist. The callback will only be called
// once for each subject, regardless of overlapping subscriptions in the sublist.
func IntersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], sl *GenericSublist[T2], cb func(subj []byte, entry *T1)) {
var _subj [255]byte
intersectStree(st, sl.root, _subj[:0], cb)
}
func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T2], subj []byte, cb func(subj []byte, entry *T1)) {
nsubj := subj
if len(nsubj) > 0 {
nsubj = append(subj, '.')
}
if r.fwc != nil {
// We've reached a full wildcard, do a FWC match on the stree at this point
// and don't keep iterating downward.
nsubj := append(nsubj, '>')
st.Match(nsubj, cb)
return
}
if r.pwc != nil {
// We've found a partial wildcard. We'll keep iterating downwards, but first
// check whether there's interest at this level (without triggering dupes) and
// match if so.
var done bool
nsubj := append(nsubj, '*')
if len(r.pwc.subs) > 0 {
st.Match(nsubj, cb)
done = true
}
if r.pwc.next.numNodes() > 0 {
intersectStree(st, r.pwc.next, nsubj, cb)
}
if done {
return
}
}
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
// A wildcard at the next level will already visit these descendents
// so skip so we don't callback the same subject more than once.
continue
}
nsubj := append(nsubj, t...)
if len(n.subs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
}
}
if n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
}
// Determine if a subject has any wildcard tokens.
func subjectHasWildcard(subject string) bool {
// This one exits earlier then !subjectIsLiteral(subject)
for i, c := range subject {
if c == pwc || c == fwc {
if (i == 0 || subject[i-1] == btsep) &&
(i+1 == len(subject) || subject[i+1] == btsep) {
return true
}
}
}
return false
}
// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer.
// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same.
func bytesToString(b []byte) string {
if len(b) == 0 {
return _EMPTY_
}
p := unsafe.SliceData(b)
return unsafe.String(p, len(b))
// use similar to append. meaning, the updated slice will be returned
func tokenizeSubjectIntoSlice(tts []string, subject string) []string {
start := 0
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tts = append(tts, subject[start:i])
start = i + 1
}
}
tts = append(tts, subject[start:])
return tts
}

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2025 The NATS Authors
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1142,6 +1142,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
}
js.mu.Lock()
// Accounts get reset to nil on shutdown, since we re-acquire the locks here, we need to check again.
if js.accounts == nil {
js.mu.Unlock()
return NewJSNotEnabledError()
}
if jsa, ok := js.accounts[a.Name]; ok {
a.mu.Lock()
a.js = jsa
@@ -1370,7 +1376,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
}
obs, err := mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false)
if err != nil {
s.Warnf(" Error adding consumer %q: %v", cfg.Name, err)
s.Warnf(" Error adding consumer '%s > %s > %s': %v", a.Name, mset.name(), cfg.Name, err)
continue
}
if isEphemeral {
@@ -1379,9 +1385,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
if !cfg.Created.IsZero() {
obs.setCreatedTime(cfg.Created)
}
if err != nil {
s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err)
}
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2025 The NATS Authors
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1296,11 +1296,6 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1319,6 +1314,12 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if !doErr {
return
@@ -1612,11 +1613,6 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1635,6 +1631,12 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -1729,11 +1731,6 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1752,6 +1749,12 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -1832,11 +1835,6 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1855,6 +1853,12 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -1967,11 +1971,6 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
Streams: []*StreamInfo{},
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1990,6 +1989,12 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -2090,11 +2095,6 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
resp.ApiResponse.Type = JSApiStreamCreateResponseType
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var clusterWideConsCount int
@@ -2184,6 +2184,12 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -2309,11 +2315,6 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *
name := tokenAt(subject, 6)
var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are not in clustered mode this is a failed request.
if !s.JetStreamIsClustered() {
@@ -2345,6 +2346,12 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *
return
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -2421,11 +2428,6 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _
}
var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are not in clustered mode this is a failed request.
if !s.JetStreamIsClustered() {
@@ -2460,6 +2462,13 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _
} else if sa == nil {
return
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var ca *consumerAssignment
if sa.consumers != nil {
ca = sa.consumers[consumer]
@@ -2547,11 +2556,6 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco
name := tokenAt(subject, 6)
var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are not in clustered mode this is a failed request.
if !s.JetStreamIsClustered() {
@@ -2580,6 +2584,12 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco
return
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3072,11 +3082,6 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
accName := tokenAt(subject, 5)
var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !s.JetStreamIsClustered() {
var streams []*stream
@@ -3117,6 +3122,12 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
return
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if js.isMetaRecovering() {
// While in recovery mode, the data structures are not fully initialized
resp.Error = NewJSClusterNotAvailError()
@@ -3339,11 +3350,6 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -3362,6 +3368,12 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3414,11 +3426,6 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su
stream := tokenAt(subject, 6)
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
@@ -3467,6 +3474,12 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3538,11 +3551,6 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
stream := tokenAt(subject, 6)
var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
@@ -3591,6 +3599,12 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3695,31 +3709,8 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account
stream := streamNameFromSubject(subject)
consumer := consumerNameFromSubject(subject)
var req JSApiConsumerUnpinRequest
var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if req.Group == _EMPTY_ {
resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !validGroupName.MatchString(req.Group) {
resp.Error = NewJSConsumerInvalidGroupNameError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if s.JetStreamIsClustered() {
// Check to make sure the stream is assigned.
js, cc := s.getJetStreamCluster()
@@ -3771,6 +3762,31 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req JSApiConsumerUnpinRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if req.Group == _EMPTY_ {
resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !validGroupName.MatchString(req.Group) {
resp.Error = NewJSConsumerInvalidGroupNameError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3834,11 +3850,6 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
stream := streamNameFromSubject(subject)
var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
@@ -3890,6 +3901,12 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -4507,11 +4524,6 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
}
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req CreateConsumerRequest
if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
@@ -4548,6 +4560,20 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
var streamName, consumerName, filteredSubject string
var rt ccReqType
@@ -4580,14 +4606,6 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
}
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
if streamName != req.Stream {
resp.Error = NewJSStreamMismatchError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -4732,11 +4750,6 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
Consumers: []string{},
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -4755,6 +4768,12 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -4859,11 +4878,6 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
Consumers: []*ConsumerInfo{},
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -4883,7 +4897,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSClusterNotAvailError()
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -4973,11 +4987,6 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
consumerName := consumerNameFromSubject(subject)
var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = NewJSNotEmptyRequestError()
@@ -5128,6 +5137,12 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !acc.JetStreamEnabled() {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -5175,11 +5190,6 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun
}
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -5198,6 +5208,12 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -5253,11 +5269,6 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
var req JSApiConsumerPauseRequest
var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if isJSONObjectOrArray(msg) {
if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
@@ -5285,6 +5296,12 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()

View File

@@ -3142,20 +3142,20 @@ func (mset *stream) resetClusteredState(err error) bool {
stype, tierName, replicas := mset.cfg.Storage, mset.tier, mset.cfg.Replicas
mset.mu.RUnlock()
// The stream might already be deleted and not assigned to us anymore.
// In any case, don't revive the stream if it's already closed.
if mset.closed.Load() || (node != nil && node.IsDeleted()) {
s.Warnf("Will not reset stream '%s > %s', stream is closed", acc, mset.name())
// Explicitly returning true here, we want the outside to break out of the monitoring loop as well.
return true
}
assert.Unreachable("Reset clustered state", map[string]any{
"stream": name,
"account": acc.Name,
"err": err,
})
// The stream might already be deleted and not assigned to us anymore.
// In any case, don't revive the stream if it's already closed.
if mset.closed.Load() {
s.Warnf("Will not reset stream '%s > %s', stream is closed", acc, mset.name())
// Explicitly returning true here, we want the outside to break out of the monitoring loop as well.
return true
}
// Stepdown regardless if we are the leader here.
if node != nil {
node.StepDown()

View File

@@ -19,7 +19,6 @@ import (
"fmt"
"math"
"slices"
"sort"
"sync"
"time"
@@ -445,7 +444,6 @@ func (ms *memStore) RegisterProcessJetStreamMsg(cb ProcessJetStreamMsgHandler) {
// GetSeqFromTime looks for the first sequence number that has the message
// with >= timestamp.
// FIXME(dlc) - inefficient.
func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
ts := t.UnixNano()
ms.mu.RLock()
@@ -469,18 +467,57 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
last := lmsg.ts
if ts == last {
return ms.state.LastSeq
return lmsg.seq
}
if ts > last {
return ms.state.LastSeq + 1
}
index := sort.Search(len(ms.msgs), func(i int) bool {
if msg := ms.msgs[ms.state.FirstSeq+uint64(i)]; msg != nil {
return msg.ts >= ts
var (
cts int64
cseq uint64
off uint64
)
// Using a binary search, but need to be aware of interior deletes.
fseq := ms.state.FirstSeq
lseq := ms.state.LastSeq
seq := lseq + 1
loop:
for fseq <= lseq {
mid := fseq + (lseq-fseq)/2
off = 0
// Potentially skip over gaps. We keep the original middle but keep track of a
// potential delete range with an offset.
for {
msg := ms.msgs[mid+off]
if msg == nil {
off++
if mid+off <= lseq {
continue
} else {
// Continue search to the left. Purposely ignore the skipped deletes here.
lseq = mid - 1
continue loop
}
}
cts = msg.ts
cseq = msg.seq
break
}
return false
})
return uint64(index) + ms.state.FirstSeq
if cts >= ts {
seq = cseq
if mid == fseq {
break
}
// Continue search to the left.
lseq = mid - 1
} else {
// Continue search to the right (potentially skipping over interior deletes).
fseq = mid + off + 1
}
}
return seq
}
// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
@@ -906,7 +943,7 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPerS
var havePartial bool
var totalSkipped uint64
// We will track start and end sequences as we go.
gsl.IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
stree.IntersectGSL[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
@@ -1463,6 +1500,12 @@ func (ms *memStore) compact(seq uint64) (uint64, error) {
var purged, bytes uint64
ms.mu.Lock()
// Short-circuit if the store was already compacted past this point.
if ms.state.FirstSeq > seq {
ms.mu.Unlock()
return purged, nil
}
cb := ms.scb
if seq <= ms.state.LastSeq {
fseq := ms.state.FirstSeq

View File

@@ -1,4 +1,4 @@
// Copyright 2013-2025 The NATS Authors
// Copyright 2013-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1279,6 +1279,7 @@ type Varz struct {
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` // SlowConsumersStats are statistics about all detected Slow Consumer
StaleConnectionStats *StaleConnectionStats `json:"stale_connection_stats,omitempty"` // StaleConnectionStats are statistics about all detected Stale Connections
Proxies *ProxiesOptsVarz `json:"proxies,omitempty"` // Proxies hold information about network proxy devices
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate of this server
}
// JetStreamVarz contains basic runtime information about jetstream
@@ -1291,34 +1292,36 @@ type JetStreamVarz struct {
// ClusterOptsVarz contains monitoring cluster information
type ClusterOptsVarz struct {
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections
Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections
Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
}
// GatewayOptsVarz contains monitoring gateway information
type GatewayOptsVarz struct {
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections
Port int `json:"port,omitempty"` // Port is the post gateway connections listens on
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients
ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make
Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes
RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections
Port int `json:"port,omitempty"` // Port is the post gateway connections listens on
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients
ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make
Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes
RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificaet
}
// RemoteGatewayOptsVarz contains monitoring remote gateway information
@@ -1340,6 +1343,7 @@ type LeafNodeOptsVarz struct {
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
}
// DenyRules Contains lists of subjects not allowed to be imported/exported
@@ -1370,6 +1374,7 @@ type MQTTOptsVarz struct {
AckWait time.Duration `json:"ack_wait,omitempty"` // AckWait is how long the internal JetStream state store will allow acks to complete
MaxAckPending uint16 `json:"max_ack_pending,omitempty"` // MaxAckPending is how many outstanding acks the internal JetStream state store will allow
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
}
// WebsocketOptsVarz contains monitoring websocket information
@@ -1388,6 +1393,7 @@ type WebsocketOptsVarz struct {
AllowedOrigins []string `json:"allowed_origins,omitempty"` // AllowedOrigins list of configured trusted origins
Compression bool `json:"compression,omitempty"` // Compression indicates if compression is supported
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
}
// OCSPResponseCacheVarz contains OCSP response cache information
@@ -1454,6 +1460,22 @@ func myUptime(d time.Duration) string {
return fmt.Sprintf("%ds", tsecs)
}
func tlsCertNotAfter(config *tls.Config) time.Time {
if config == nil || len(config.Certificates) == 0 {
return time.Time{}
}
cert := config.Certificates[0]
leaf := cert.Leaf
if leaf == nil {
var err error
leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
return time.Time{}
}
}
return leaf.NotAfter
}
// HandleRoot will show basic info and links to others handlers.
func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
// This feels dumb to me, but is required: https://code.google.com/p/go/issues/detail?id=4799
@@ -1779,6 +1801,13 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
v.TLSOCSPPeerVerify = s.ocspPeerVerify && v.TLSRequired && s.opts.tlsConfigOpts != nil && s.opts.tlsConfigOpts.OCSPPeerConfig != nil && s.opts.tlsConfigOpts.OCSPPeerConfig.Verify
v.TLSCertNotAfter = tlsCertNotAfter(opts.TLSConfig)
v.Cluster.TLSCertNotAfter = tlsCertNotAfter(opts.Cluster.TLSConfig)
v.Gateway.TLSCertNotAfter = tlsCertNotAfter(opts.Gateway.TLSConfig)
v.LeafNode.TLSCertNotAfter = tlsCertNotAfter(opts.LeafNode.TLSConfig)
v.MQTT.TLSCertNotAfter = tlsCertNotAfter(opts.MQTT.TLSConfig)
v.Websocket.TLSCertNotAfter = tlsCertNotAfter(opts.Websocket.TLSConfig)
if opts.Proxies != nil {
if v.Proxies == nil {
v.Proxies = &ProxiesOptsVarz{}
@@ -3982,6 +4011,11 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
return health
}
// Healthz returns the health status of the server.
func (s *Server) Healthz(opts *HealthzOptions) *HealthStatus {
return s.healthz(opts)
}
type ExpvarzStatus struct {
Memstats json.RawMessage `json:"memstats"`
Cmdline json.RawMessage `json:"cmdline"`

View File

@@ -2326,7 +2326,7 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error {
case "cluster_traffic":
vv, ok := mv.(string)
if !ok {
return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'account' string value for %q, got %v", mk, mv)}
return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'owner' string value for %q, got %v", mk, mv)}
}
switch vv {
case "system", _EMPTY_:

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2025 The NATS Authors
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -83,6 +83,7 @@ type RaftNode interface {
Stop()
WaitForStop()
Delete()
IsDeleted() bool
RecreateInternalSubs() error
IsSystemAccount() bool
GetTrafficAccountName() string
@@ -231,6 +232,7 @@ type raft struct {
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
membChanging bool // There is a membership change proposal in progress
deleted bool // If the node was deleted.
}
type proposedEntry struct {
@@ -1735,11 +1737,6 @@ func (n *raft) StepDown(preferred ...string) error {
}
}
}
// Clear our vote state.
n.vote = noVote
n.writeTermVote()
n.Unlock()
if len(preferred) > 0 && maybeLeader == noLeader {
@@ -1920,6 +1917,7 @@ func (n *raft) Delete() {
n.Lock()
defer n.Unlock()
n.deleted = true
if wal := n.wal; wal != nil {
wal.Delete(false)
}
@@ -1927,6 +1925,12 @@ func (n *raft) Delete() {
n.debug("Deleted")
}
func (n *raft) IsDeleted() bool {
n.RLock()
defer n.RUnlock()
return n.deleted
}
func (n *raft) shutdown() {
// First call to Stop or Delete should close the quit chan
// to notify the runAs goroutines to stop what they're doing.
@@ -3401,9 +3405,9 @@ func (n *raft) runAsCandidate() {
n.requestVote()
// We vote for ourselves.
votes := map[string]struct{}{
n.ID(): {},
}
n.votes.push(&voteResponse{term: n.term, peer: n.ID(), granted: true})
votes := map[string]struct{}{}
emptyVotes := map[string]struct{}{}
for n.State() == Candidate {
@@ -3968,10 +3972,6 @@ CONTINUE:
// Here we can become a leader but need to wait for resume of the apply queue.
n.lxfer = true
}
} else if n.vote != noVote {
// Since we are here we are not the chosen one but we should clear any vote preference.
n.vote = noVote
n.writeTermVote()
}
}
case EntryAddPeer:
@@ -4211,6 +4211,9 @@ func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) error {
if !shouldStore {
ae.returnToPool()
}
if n.csz == 1 {
n.tryCommit(n.pindex)
}
return nil
}

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2025 The NATS Authors
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -425,6 +425,7 @@ type stream struct {
active bool // Indicates that there are active internal subscriptions (for the subject filters)
// and/or mirror/sources consumers are scheduled to be established or already started.
closed atomic.Bool // Set to true when stop() is called on the stream.
cisrun atomic.Bool // Indicates one checkInterestState is already running.
// Mirror
mirror *sourceInfo
@@ -837,9 +838,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
// Add created timestamp used for the store, must match that of the stream assignment if it exists.
if sa != nil {
js.mu.RLock()
// The following assignment does not require mutex
// protection: sa.Created is immutable.
mset.created = sa.Created
js.mu.RUnlock()
}
// Start our signaling routine to process consumers.
@@ -1821,7 +1822,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo
// check for duplicates
var iNames = make(map[string]struct{})
for _, src := range cfg.Sources {
if !isValidName(src.Name) {
if src == nil || !isValidName(src.Name) {
return StreamConfig{}, NewJSSourceInvalidStreamNameError()
}
if _, ok := iNames[src.composeIName()]; !ok {
@@ -3162,7 +3163,6 @@ func (mset *stream) setupMirrorConsumer() error {
}
mirror := mset.mirror
mirrorWg := &mirror.wg
// We want to throttle here in terms of how fast we request new consumers,
// or if the previous is still in progress.
@@ -3321,7 +3321,16 @@ func (mset *stream) setupMirrorConsumer() error {
// Wait for previous processMirrorMsgs go routine to be completely done.
// If none is running, this will not block.
mirrorWg.Wait()
mset.mu.Lock()
if mset.mirror == nil {
// Mirror config has been removed.
mset.mu.Unlock()
return
} else {
wg := &mset.mirror.wg
mset.mu.Unlock()
wg.Wait()
}
select {
case ccr := <-respCh:
@@ -6045,13 +6054,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
return nil
}
// If here we will attempt to store the message.
// Assume this will succeed.
olmsgId := mset.lmsgId
mset.lmsgId = msgId
mset.lseq++
tierName := mset.tier
// Republish state if needed.
var tsubj string
var tlseq uint64
@@ -6075,7 +6077,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// If clustered this was already checked and we do not want to check here and possibly introduce skew.
// Don't error and log if we're tracing when clustered.
if !isClustered {
if exceeded, err := jsa.wouldExceedLimits(stype, tierName, mset.cfg.Replicas, subject, hdr, msg); exceeded {
if exceeded, err := jsa.wouldExceedLimits(stype, mset.tier, mset.cfg.Replicas, subject, hdr, msg); exceeded {
if err == nil {
err = NewJSAccountResourcesExceededError()
}
@@ -6134,11 +6136,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err)
return err
}
// If we did not succeed put those values back and increment clfs in case we are clustered.
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
mset.lmsgId = olmsgId
// If we did not succeed increment clfs in case we are clustered.
bumpCLFS()
switch err {
@@ -6159,6 +6157,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
// If here we succeeded in storing the message.
mset.lmsgId = msgId
mset.lseq = seq
// If we have a msgId make sure to save.
// This will replace our estimate from the cluster layer if we are clustered.
@@ -7298,11 +7298,30 @@ func (mset *stream) checkInterestState() {
return
}
// Ensure only one of these runs at the same time.
if !mset.cisrun.CompareAndSwap(false, true) {
return
}
defer mset.cisrun.Store(false)
var ss StreamState
mset.store.FastState(&ss)
asflr := uint64(math.MaxUint64)
for _, o := range mset.getConsumers() {
o.checkStateForInterestStream(&ss)
o.mu.RLock()
chkflr := o.chkflr
o.mu.RUnlock()
asflr = min(asflr, chkflr)
}
mset.cfgMu.RLock()
rp := mset.cfg.Retention
mset.cfgMu.RUnlock()
// Remove as many messages from the "head" of the stream if there's no interest anymore.
if rp == InterestPolicy && asflr != math.MaxUint64 {
mset.store.Compact(asflr)
}
}
@@ -7389,20 +7408,18 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) {
o.sigSubs = nil
}
if o.isLeader() {
if mset.csl == nil {
mset.csl = gsl.NewSublist[*consumer]()
}
// If no filters are preset, add fwcs to sublist for that consumer.
if newFilters == nil {
mset.csl.Insert(fwcs, o)
o.sigSubs = append(o.sigSubs, fwcs)
// If there are filters, add their subjects to sublist.
} else {
for _, filter := range newFilters {
mset.csl.Insert(filter, o)
o.sigSubs = append(o.sigSubs, filter)
}
if mset.csl == nil {
mset.csl = gsl.NewSublist[*consumer]()
}
// If no filters are present, add fwcs to sublist for that consumer.
if newFilters == nil {
mset.csl.Insert(fwcs, o)
o.sigSubs = append(o.sigSubs, fwcs)
} else {
// If there are filters, add their subjects to sublist.
for _, filter := range newFilters {
mset.csl.Insert(filter, o)
o.sigSubs = append(o.sigSubs, filter)
}
}
o.mu.Unlock()
@@ -7479,14 +7496,18 @@ func (mset *stream) partitionUnique(name string, partitions []string) bool {
if n == name {
continue
}
o.mu.RLock()
if o.subjf == nil {
o.mu.RUnlock()
return false
}
for _, filter := range o.subjf {
if SubjectsCollide(partition, filter.subject) {
o.mu.RUnlock()
return false
}
}
o.mu.RUnlock()
}
}
return true

View File

@@ -16,6 +16,9 @@ package stree
import (
"bytes"
"slices"
"unsafe"
"github.com/nats-io/nats-server/v2/server/gsl"
)
// SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects.
@@ -448,3 +451,60 @@ func LazyIntersect[TL, TR any](tl *SubjectTree[TL], tr *SubjectTree[TR], cb func
})
}
}
// IntersectGSL will match all items in the given subject tree that
// have interest expressed in the given sublist. The callback will only be called
// once for each subject, regardless of overlapping subscriptions in the sublist.
func IntersectGSL[T any, SL comparable](t *SubjectTree[T], sl *gsl.GenericSublist[SL], cb func(subject []byte, val *T)) {
if t == nil || t.root == nil || sl == nil {
return
}
var _pre [256]byte
_intersectGSL(t.root, _pre[:0], sl, cb)
}
func _intersectGSL[T any, SL comparable](n node, pre []byte, sl *gsl.GenericSublist[SL], cb func(subject []byte, val *T)) {
if n.isLeaf() {
ln := n.(*leaf[T])
subj := append(pre, ln.suffix...)
if sl.HasInterest(bytesToString(subj)) {
cb(subj, &ln.value)
}
return
}
bn := n.base()
pre = append(pre, bn.prefix...)
for _, cn := range n.children() {
if cn == nil {
continue
}
subj := append(pre, cn.path()...)
if !hasInterestForTokens(sl, subj, len(pre)) {
continue
}
_intersectGSL(cn, pre, sl, cb)
}
}
// The subject tree can return partial tokens so we need to check starting interest
// only from whole tokens when we encounter a tsep.
func hasInterestForTokens[SL comparable](sl *gsl.GenericSublist[SL], subj []byte, since int) bool {
for i := since; i < len(subj); i++ {
if subj[i] == tsep {
if !sl.HasInterestStartingIn(bytesToString(subj[:i])) {
return false
}
}
}
return true
}
// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer.
// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same.
func bytesToString(b []byte) string {
if len(b) == 0 {
return ""
}
p := unsafe.SliceData(b)
return unsafe.String(p, len(b))
}

View File

@@ -21,8 +21,6 @@ import (
"sync"
"sync/atomic"
"unicode/utf8"
"github.com/nats-io/nats-server/v2/server/stree"
)
// Sublist is a routing mechanism to handle subject distribution and
@@ -818,7 +816,9 @@ func matchLevelForAny(l *level, toks []string, np, nq *int) bool {
*nq += len(qsub)
}
}
return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0
if len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 {
return true
}
}
if pwc != nil {
if np != nil && nq != nil {
@@ -1726,63 +1726,3 @@ func getAllNodes(l *level, results *SublistResult) {
getAllNodes(n.next, results)
}
}
// IntersectStree will match all items in the given subject tree that
// have interest expressed in the given sublist. The callback will only be called
// once for each subject, regardless of overlapping subscriptions in the sublist.
func IntersectStree[T any](st *stree.SubjectTree[T], sl *Sublist, cb func(subj []byte, entry *T)) {
var _subj [255]byte
intersectStree(st, sl.root, _subj[:0], cb)
}
func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb func(subj []byte, entry *T)) {
nsubj := subj
if len(nsubj) > 0 {
nsubj = append(subj, '.')
}
if r.fwc != nil {
// We've reached a full wildcard, do a FWC match on the stree at this point
// and don't keep iterating downward.
nsubj := append(nsubj, '>')
st.Match(nsubj, cb)
return
}
if r.pwc != nil {
// We've found a partial wildcard. We'll keep iterating downwards, but first
// check whether there's interest at this level (without triggering dupes) and
// match if so.
var done bool
nsubj := append(nsubj, '*')
if len(r.pwc.psubs)+len(r.pwc.qsubs) > 0 {
st.Match(nsubj, cb)
done = true
}
if r.pwc.next.numNodes() > 0 {
intersectStree(st, r.pwc.next, nsubj, cb)
}
if done {
return
}
}
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
// A wildcard at the next level will already visit these descendents
// so skip so we don't callback the same subject more than once.
continue
}
nsubj := append(nsubj, t...)
if len(n.psubs)+len(n.qsubs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
}
}
if n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2024 The NATS Authors
// Copyright 2012-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -21,7 +21,6 @@ package store
import (
"context"
"crypto/tls"
"fmt"
"strings"
"time"
@@ -33,7 +32,6 @@ import (
"github.com/opencloud-eu/reva/v2/pkg/store/etcd"
"github.com/opencloud-eu/reva/v2/pkg/store/memory"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
microstore "go-micro.dev/v4/store"
)
@@ -127,33 +125,19 @@ func Create(opts ...microstore.Option) microstore.Store {
return *ocMemStore
case TypeNatsJS:
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
store := natsjs.NewStore(
return natsjs.NewStore(
append(opts,
natsjs.NatsOptions(natsOptions), // always pass in properly initialized default nats options
natsjs.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
)
err := updateNatsStore(opts, ttl, natsOptions)
if err != nil {
options.Logger.Logf(logger.ErrorLevel, "failed to update nats-js store: '%s'", err.Error())
}
return store
case TypeNatsJSKV:
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
store := natsjskv.NewStore(
return natsjskv.NewStore(
append(opts,
natsjskv.NatsOptions(natsOptions), // always pass in properly initialized default nats options
natsjskv.EncodeKeys(), // nats has restrictions on the key, we cannot use slashes
natsjskv.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
)
err := updateNatsStore(opts, ttl, natsOptions)
if err != nil {
options.Logger.Logf(logger.ErrorLevel, "failed to update nats-js-kv store: '%s'", err.Error())
}
return store
case TypeMemory, "mem", "": // allow existing short form and use as default
return microstore.NewMemoryStore(opts...)
default:
@@ -162,58 +146,13 @@ func Create(opts ...microstore.Option) microstore.Store {
}
}
func updateNatsStore(opts []store.Option, ttl time.Duration, natsOptions nats.Options) error {
options := store.Options{}
for _, o := range opts {
o(&options)
}
bucketName := options.Database
if bucketName == "" {
return fmt.Errorf("bucket name (database) must be set")
}
if len(options.Nodes) > 0 {
natsOptions.Servers = options.Nodes
}
nc, err := natsOptions.Connect()
if err != nil {
return fmt.Errorf("could not connect to nats: %w", err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
return err
}
// NATS KV buckets are actually streams named "KV_<bucket_name>"
info, err := js.StreamInfo("KV_" + bucketName)
if err != nil {
return fmt.Errorf("failed to get bucket info: %w", err)
}
config := info.Config
config.MaxAge = ttl
_, err = js.UpdateStream(&config)
if err != nil {
return fmt.Errorf("failed to update bucket TTL: %w", err)
}
return nil
}
func natsConfig(log logger.Logger, ctx context.Context, opts []microstore.Option) ([]microstore.Option, time.Duration, nats.Options) {
if mem, _ := ctx.Value(disablePersistanceContextKey{}).(bool); mem {
opts = append(opts, natsjs.DefaultMemory())
}
ttl := time.Duration(0)
if d, ok := ctx.Value(ttlContextKey{}).(time.Duration); ok {
ttl = d
}
ttl, _ := ctx.Value(ttlContextKey{}).(time.Duration)
// preparing natsOptions before the switch to reuse the same code
natsOptions := nats.GetDefaultOptions()

8
vendor/modules.txt vendored
View File

@@ -746,7 +746,7 @@ github.com/google/go-querystring/query
# github.com/google/go-tika v0.3.1
## explicit; go 1.11
github.com/google/go-tika/tika
# github.com/google/go-tpm v0.9.7
# github.com/google/go-tpm v0.9.8
## explicit; go 1.22
github.com/google/go-tpm/legacy/tpm2
github.com/google/go-tpm/tpmutil
@@ -873,7 +873,7 @@ github.com/justinas/alice
# github.com/kevinburke/ssh_config v1.2.0
## explicit
github.com/kevinburke/ssh_config
# github.com/klauspost/compress v1.18.2
# github.com/klauspost/compress v1.18.3
## explicit; go 1.23
github.com/klauspost/compress
github.com/klauspost/compress/flate
@@ -1155,7 +1155,7 @@ github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.8.0
## explicit; go 1.23.0
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.12.3
# github.com/nats-io/nats-server/v2 v2.12.4
## explicit; go 1.24.0
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand
@@ -1376,7 +1376,7 @@ github.com/opencloud-eu/icap-client
# github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
## explicit; go 1.18
github.com/opencloud-eu/libre-graph-api-go
# github.com/opencloud-eu/reva/v2 v2.42.1
# github.com/opencloud-eu/reva/v2 v2.42.0
## explicit; go 1.24.1
github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace
github.com/opencloud-eu/reva/v2/cmd/revad/runtime