Compare commits

..

10 Commits

Author SHA1 Message Date
Artur Neumann
663fe3740b Merge pull request #2236 from opencloud-eu/checkForVersionPlaceholders
ci: check for version placeholders
2026-01-29 18:25:42 +05:45
Artur Neumann
4a83b34b91 ci: check for version placeholders 2026-01-29 16:42:38 +05:45
Alex
8b4c218a5f chore: change target path for docs automation (#2235) 2026-01-29 11:05:45 +01:00
Sawjan Gurung
70623d6007 Merge pull request #2041 from opencloud-eu/test/test-hook-failure
[full-ci][tests-only] test: add hook failures to the test failures list
2026-01-29 10:08:25 +05:45
opencloudeu
cad6a61120 [tx] updated from transifex 2026-01-29 00:13:28 +00:00
Michael Flemming
754f0fa5b7 Merge pull request #2229 from opencloud-eu/fix_docs_gen_path
fix path and trigger for docsGenPr
2026-01-28 18:03:15 +01:00
Michael 'Flimmy' Flemming
38cf037a11 fix path and trigger for docsGenPr 2026-01-28 17:53:11 +01:00
OpenCloud Devops
9eac47dab4 🎉 Release 5.0.1 (#2218)
* 🎉 Release 5.0.1

* 🎉 Release 5.0.1

* 🎉 Release 5.0.1

* 🎉 Release 5.0.1
2026-01-28 16:06:09 +01:00
Viktor Scharf
e7c14d2ee4 reva-bump-2.42.1 (#2225) 2026-01-28 15:51:15 +01:00
Saw-jan
cd408c6923 test: add hook failures to the test failures list
Signed-off-by: Saw-jan <saw.jan.grg3e@gmail.com>
2026-01-28 11:29:48 +05:45
64 changed files with 697 additions and 937 deletions

View File

@@ -523,6 +523,7 @@ def main(ctx):
testPipelines(ctx)
build_release_pipelines = \
checkVersionPlaceholder() + \
dockerReleases(ctx) + \
binaryReleases(ctx)
@@ -1817,6 +1818,27 @@ def dockerReleases(ctx):
return pipelines
def checkVersionPlaceholder():
return [{
"name": "check-version-placeholder",
"steps": [
{
"name": "check-version-placeholder",
"image": OC_CI_ALPINE,
"commands": [
"grep -r -e '%%NEXT%%' -e '%%NEXT_PRODUCTION_VERSION%%' %s/services %s/pkg > next_version.txt" % (
dirs["base"],
dirs["base"],
),
'if [ -s next_version.txt ]; then echo "replace version placeholders"; cat next_version.txt; exit 1; fi',
],
},
],
"when": [
event["tag"],
],
}]
def dockerRelease(ctx, repo, build_type):
build_args = {
"REVISION": "%s" % ctx.build.commit,
@@ -2230,7 +2252,7 @@ def genDocsPr(ctx):
"MY_TARGET_BRANCH": "${CI_COMMIT_BRANCH##stable-}",
},
"commands": [
'export DOC_GIT_TARGET_FOLDER="$$(if [ \"$$MY_TARGET_BRANCH\" = \"main\" ]; then echo \"tmpdocs/docs/dev/_static/env-vars/\"; else echo \"tmpdocs/versioned_docs/version-$${MY_TARGET_BRANCH}/dev/_static/env-vars/\"; fi)"',
'export DOC_GIT_TARGET_FOLDER="$$(if [ \"$$MY_TARGET_BRANCH\" = \"main\" ]; then echo \"tmpdocs/docs/_static/env-vars/\"; else echo \"tmpdocs/versioned_docs/version-$${MY_TARGET_BRANCH}/_static/env-vars/\"; fi)"',
'echo "$${CI_SSH_KEY}" > /root/id_rsa && chmod 600 /root/id_rsa',
'git config --global user.email "devops@opencloud.eu"',
'git config --global user.name "openclouders"',
@@ -2255,8 +2277,8 @@ def genDocsPr(ctx):
},
{
"event": "cron",
"branch": "[main]",
"cron": "nightly *",
"branch": "main",
"cron": "nightly*",
},
],
}]

View File

@@ -1,5 +1,23 @@
# Changelog
## [5.0.1](https://github.com/opencloud-eu/opencloud/releases/tag/v5.0.1) - 2026-01-28
### ❤️ Thanks to all contributors! ❤️
@ScharfViktor, @aduffeck, @saw-jan
### 🐛 Bug Fixes
- Do not ever set a TTL for the ID cache. It's not supposed to expire. [[#2223](https://github.com/opencloud-eu/opencloud/pull/2223)]
### ✅ Tests
- test(api): wait for web-office readiness by checking discovery endpoint [[#2217](https://github.com/opencloud-eu/opencloud/pull/2217)]
### 📦️ Dependencies
- reva-bump-2.42.1 [[#2225](https://github.com/opencloud-eu/opencloud/pull/2225)]
## [5.0.0](https://github.com/opencloud-eu/opencloud/releases/tag/v5.0.0) - 2026-01-26
### ❤️ Thanks to all contributors! ❤️

8
go.mod
View File

@@ -55,7 +55,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.12.4
github.com/nats-io/nats-server/v2 v2.12.3
github.com/nats-io/nats.go v1.48.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.1.3
@@ -65,7 +65,7 @@ require (
github.com/open-policy-agent/opa v1.12.3
github.com/opencloud-eu/icap-client v0.0.0-20250930132611-28a2afe62d89
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
github.com/opencloud-eu/reva/v2 v2.42.0
github.com/opencloud-eu/reva/v2 v2.42.1
github.com/opensearch-project/opensearch-go/v4 v4.6.0
github.com/orcaman/concurrent-map v1.0.0
github.com/pkg/errors v0.9.1
@@ -241,7 +241,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v1.9.3 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/go-tpm v0.9.8 // indirect
github.com/google/go-tpm v0.9.7 // indirect
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
github.com/google/renameio/v2 v2.0.1 // indirect
github.com/gookit/goutil v0.7.1 // indirect
@@ -261,7 +261,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/juliangruber/go-intersect v1.1.0 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.18.3 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/klauspost/crc32 v1.3.0 // indirect
github.com/kovidgoyal/go-parallel v1.1.1 // indirect

16
go.sum
View File

@@ -575,8 +575,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/go-tika v0.3.1 h1:l+jr10hDhZjcgxFRfcQChRLo1bPXQeLFluMyvDhXTTA=
github.com/google/go-tika v0.3.1/go.mod h1:DJh5N8qxXIl85QkqmXknd+PeeRkUOTbvwyYf7ieDz6c=
github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo=
github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA=
github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@@ -730,8 +730,8 @@ github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
@@ -916,8 +916,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.4 h1:ZnT10v2LU2Xcoiy8ek9X6Se4YG8EuMfIfvAEuFVx1Ts=
github.com/nats-io/nats-server/v2 v2.12.4/go.mod h1:5MCp/pqm5SEfsvVZ31ll1088ZTwEUdvRX1Hmh/mTTDg=
github.com/nats-io/nats-server/v2 v2.12.3 h1:KRv+1n7lddMVgkJPQer+pt36TcO0ENxjilBmeWdjcHs=
github.com/nats-io/nats-server/v2 v2.12.3/go.mod h1:MQXjG9WjyXKz9koWzUc3jYUMKD8x3CLmTNy91IQQz3Y=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
@@ -969,8 +969,8 @@ github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9 h1:dIft
github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9/go.mod h1:JWyDC6H+5oZRdUJUgKuaye+8Ph5hEs6HVzVoPKzWSGI=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76 h1:vD/EdfDUrv4omSFjrinT8Mvf+8D7f9g4vgQ2oiDrVUI=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q=
github.com/opencloud-eu/reva/v2 v2.42.0 h1:CWlXbNqUSduQ5Afi6XoegoJ/zyV0Vx5UoPKAZZmEAq4=
github.com/opencloud-eu/reva/v2 v2.42.0/go.mod h1:pv+w23JG0/qJweZbTzNNev//YEvlUML1L/2iXgKGkkg=
github.com/opencloud-eu/reva/v2 v2.42.1 h1:QUZOLSfAhb7bw+qsVSFMFY644rUz4/NtnOiJ0QQxj2o=
github.com/opencloud-eu/reva/v2 v2.42.1/go.mod h1:pv+w23JG0/qJweZbTzNNev//YEvlUML1L/2iXgKGkkg=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=

View File

@@ -34,7 +34,7 @@ var (
// LatestTag is the latest released version plus the dev meta version.
// Will be overwritten by the release pipeline
// Needs a manual change for every tagged release
LatestTag = "5.0.0+dev"
LatestTag = "5.0.1+dev"
// Date indicates the build date.
// This has been removed, it looks like you can only replace static strings with recent go versions

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Ivan Fustero, 2025\n"
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Jörn Friedrich Dreyer <jfd@butonic.de>, 2025\n"
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Elías Martín, 2025\n"
"Language-Team: Spanish (https://app.transifex.com/opencloud-eu/teams/204053/es/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Simone Broglia, 2025\n"
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"

View File

@@ -12,7 +12,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Junghyuk Kwon <kwon@junghy.uk>, 2025\n"
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: YQS Yang, 2025\n"
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Ivan Fustero, 2025\n"
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Jörn Friedrich Dreyer <jfd@butonic.de>, 2025\n"
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Elías Martín, 2025\n"
"Language-Team: Spanish (https://app.transifex.com/opencloud-eu/teams/204053/es/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Simone Broglia, 2025\n"
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: gapho shin, 2025\n"
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: YQS Yang, 2025\n"
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Ivan Fustero, 2025\n"
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"

View File

@@ -12,7 +12,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Jonas, 2025\n"
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Simone Broglia, 2025\n"
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: gapho shin, 2025\n"
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: YQS Yang, 2025\n"
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Ivan Fustero, 2025\n"
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Jörn Friedrich Dreyer <jfd@butonic.de>, 2025\n"
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"

View File

@@ -12,7 +12,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Alejandro Robles, 2025\n"
"Language-Team: Spanish (https://app.transifex.com/opencloud-eu/teams/204053/es/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Simone Pagano, 2025\n"
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: gapho shin, 2025\n"
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"

View File

@@ -12,7 +12,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Lulufox, 2025\n"
"Language-Team: Russian (https://app.transifex.com/opencloud-eu/teams/204053/ru/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Davis Kaza, 2025\n"
"Language-Team: Swedish (https://app.transifex.com/opencloud-eu/teams/204053/sv/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: YQS Yang, 2025\n"
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Ivan Fustero, 2025\n"
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Jörn Friedrich Dreyer <jfd@butonic.de>, 2025\n"
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Elías Martín, 2025\n"
"Language-Team: Spanish (https://app.transifex.com/opencloud-eu/teams/204053/es/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Simone Broglia, 2025\n"
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: gapho shin, 2025\n"
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: Lulufox, 2025\n"
"Language-Team: Russian (https://app.transifex.com/opencloud-eu/teams/204053/ru/)\n"

View File

@@ -11,7 +11,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: EMAIL\n"
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
"Last-Translator: YQS Yang, 2025\n"
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"

View File

@@ -279,14 +279,16 @@ function run_behat_tests() {
fi
fi
FAILED_SCENARIO_PATHS_COLORED=`awk '/Failed scenarios:/',0 ${TEST_LOG_FILE} | grep -a feature`
FAILED_SCENARIO_PATHS_COLORED=`awk '/Failed (scenarios|hooks):/',0 ${TEST_LOG_FILE} | grep -a feature`
# There will be some ANSI escape codes for color in the FEATURE_COLORED var.
# Strip them out so we can pass just the ordinary feature details to Behat.
# Also strip everything after ".feature:XX", including text such as "(on line xx)" added by Behat indicating the failing step's line number.
# Thanks to https://en.wikipedia.org/wiki/Tee_(command) and
# https://stackoverflow.com/questions/23416278/how-to-strip-ansi-escape-sequences-from-a-variable
# for ideas.
FAILED_SCENARIO_PATHS=$(echo "${FAILED_SCENARIO_PATHS_COLORED}" | sed "s/\x1b[^m]*m//g" | sed 's/\(\.feature:[0-9]\+\).*/\1/')
FAILED_SCENARIO_PATHS=$(echo "${FAILED_SCENARIO_PATHS_COLORED}" | sed "s/\x1b[^m]*m//g" | sed "s/AfterScenario \"//g" | sed 's/\(\.feature:[0-9]\+\).*/\1/')
# remove duplicate scenario paths
FAILED_SCENARIO_PATHS=$(echo "$FAILED_SCENARIO_PATHS" | awk '!seen[$0]++')
# If something else went wrong, and there were no failed scenarios,
# then the awk, grep, sed command sequence above ends up with an empty string.

View File

@@ -60,9 +60,6 @@ type parser struct {
// pedantic reports error when configuration is not correct.
pedantic bool
// Tracks environment variable references, to avoid cycles
envVarReferences map[string]bool
}
// Parse will return a map of keys to any, although concrete types
@@ -183,37 +180,16 @@ func (t *token) Position() int {
return t.item.pos
}
func newParser(data, fp string, pedantic bool) *parser {
return &parser{
mapping: make(map[string]any),
lx: lex(data),
ctxs: make([]any, 0, 4),
keys: make([]string, 0, 4),
ikeys: make([]item, 0, 4),
fp: filepath.Dir(fp),
pedantic: pedantic,
envVarReferences: make(map[string]bool),
func parse(data, fp string, pedantic bool) (p *parser, err error) {
p = &parser{
mapping: make(map[string]any),
lx: lex(data),
ctxs: make([]any, 0, 4),
keys: make([]string, 0, 4),
ikeys: make([]item, 0, 4),
fp: filepath.Dir(fp),
pedantic: pedantic,
}
}
func parse(data, fp string, pedantic bool) (*parser, error) {
p := newParser(data, fp, pedantic)
if err := p.parse(fp); err != nil {
return nil, err
}
return p, nil
}
func parseEnv(data string, parent *parser) (*parser, error) {
p := newParser(data, "", false)
p.envVarReferences = parent.envVarReferences
if err := p.parse(""); err != nil {
return nil, err
}
return p, nil
}
func (p *parser) parse(fp string) error {
p.pushContext(p.mapping)
var prevItem item
@@ -223,16 +199,16 @@ func (p *parser) parse(fp string) error {
// Here we allow the final character to be a bracket '}'
// in order to support JSON like configurations.
if prevItem.typ == itemKey && prevItem.val != mapEndString {
return fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos)
return nil, fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos)
}
break
}
prevItem = it
if err := p.processItem(it, fp); err != nil {
return err
return nil, err
}
}
return nil
return p, nil
}
func (p *parser) next() item {
@@ -477,18 +453,11 @@ func (p *parser) lookupVariable(varReference string) (any, bool, error) {
}
// If we are here, we have exhausted our context maps and still not found anything.
// Detect reference cycles
if p.envVarReferences[varReference] {
return nil, false, fmt.Errorf("variable reference cycle for '%s'", varReference)
}
p.envVarReferences[varReference] = true
defer delete(p.envVarReferences, varReference)
// Parse from the environment
// Parse from the environment.
if vStr, ok := os.LookupEnv(varReference); ok {
// Everything we get here will be a string value, so we need to process as a parser would.
if subp, err := parseEnv(fmt.Sprintf("%s=%s", pkey, vStr), p); err == nil {
v, ok := subp.mapping[pkey]
if vmap, err := Parse(fmt.Sprintf("%s=%s", pkey, vStr)); err == nil {
v, ok := vmap[pkey]
return v, ok, nil
} else {
return nil, false, err

View File

@@ -299,7 +299,6 @@ func (a *Account) shallowCopy(na *Account) {
na.Nkey = a.Nkey
na.Issuer = a.Issuer
na.traceDest, na.traceDestSampling = a.traceDest, a.traceDestSampling
na.nrgAccount = a.nrgAccount
if a.imports.streams != nil {
na.imports.streams = make([]*streamImport, 0, len(a.imports.streams))

View File

@@ -1,4 +1,4 @@
// Copyright 2023-2025 The NATS Authors
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2026 The NATS Authors
// Copyright 2012-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand"
"net"
"net/http"
@@ -36,6 +35,8 @@ import (
"sync/atomic"
"time"
"slices"
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats-server/v2/internal/fastrand"
@@ -2731,12 +2732,9 @@ func (c *client) updateS2AutoCompressionLevel(co *CompressionOpts, compression *
}
// Will return the parts from the raw wire msg.
// We return the `hdr` as a slice that is capped to the length of the headers
// so that if the caller later tries to append to the returned header slice it
// does not affect the message content.
func (c *client) msgParts(data []byte) (hdr []byte, msg []byte) {
if c != nil && c.pa.hdr > 0 {
return data[:c.pa.hdr:c.pa.hdr], data[c.pa.hdr:]
return data[:c.pa.hdr], data[c.pa.hdr:]
}
return nil, data
}
@@ -3339,7 +3337,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
sub.shadow = nil
if len(shadowSubs) > 0 {
isSpokeLeaf = c.isSpokeLeafNode()
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF || c.kind == JETSTREAM) && c.srv != nil
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil
}
sub.close()
c.mu.Unlock()
@@ -4567,19 +4565,6 @@ func getHeaderKeyIndex(key string, hdr []byte) int {
}
}
// setHeader will replace the value of the first existing key `key`
// with the given value `val`, or add this new key at the end of
// the headers.
//
// Note: If the key does not exist, or if it exists but the new value
// would make the resulting byte slice larger than the original one,
// a new byte slice is returned and the original is left untouched.
// This is to prevent situations where caller may have a `hdr` and
// `msg` that are the parts of an underlying buffer. Extending the
// `hdr` would otherwise overwrite the `msg` part.
//
// If the new value is smaller, then the original `hdr` byte slice
// is modified.
func setHeader(key, val string, hdr []byte) []byte {
start := getHeaderKeyIndex(key, hdr)
if start >= 0 {
@@ -4594,45 +4579,15 @@ func setHeader(key, val string, hdr []byte) []byte {
return hdr // malformed headers
}
valEnd += valStart
// Length of the existing value (before the `\r`)
oldValLen := valEnd - valStart
// This is how many extra bytes we need for the new value.
// If <= 0, it means that we need less and so will reuse the `hdr` buffer.
if extra := len(val) - oldValLen; extra > 0 {
// Check that we don't overflow an "int".
if rem := math.MaxInt - hdrLen; rem < extra {
// We don't grow, and return the existing header.
return hdr
}
// The new size is the old size plus the extra bytes.
newHdrSize := hdrLen + extra
newHdr := make([]byte, newHdrSize)
// Copy the parts from `hdr` and `val` into the new buffer.
n := copy(newHdr, hdr[:valStart])
n += copy(newHdr[n:], val)
copy(newHdr[n:], hdr[valEnd:])
return newHdr
}
// We can write in place since it fits in the existing `hdr` buffer.
n := copy(hdr[valStart:], val)
n += copy(hdr[valStart+n:], hdr[valEnd:])
hdr = hdr[:valStart+n]
return hdr
suffix := slices.Clone(hdr[valEnd:])
newHdr := append(hdr[:valStart], val...)
return append(newHdr, suffix...)
}
if len(hdr) > 0 && bytes.HasSuffix(hdr, []byte("\r\n")) {
hdr = hdr[:len(hdr)-2]
val += "\r\n"
}
// Create the new buffer based on length of existing one and
// length of the new "<key>: <value>\r\n". Protect against "int" overflow.
newSize := uint64(len(hdr)) + uint64(len(key)) + 1 + 1 + uint64(len(val)) + 2
if newSize > uint64(math.MaxInt) {
// We don't grow, and return the existing header.
return hdr
}
newHdr := make([]byte, 0, int(newSize))
newHdr = append(newHdr, hdr...)
return fmt.Appendf(newHdr, "%s: %s\r\n", key, val)
return fmt.Appendf(hdr, "%s: %s\r\n", key, val)
}
// For bytes.HasPrefix below.

View File

@@ -66,7 +66,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.12.4"
VERSION = "2.12.3"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2026 The NATS Authors
// Copyright 2019-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1030,11 +1030,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
if cName != _EMPTY_ {
if eo, ok := mset.consumers[cName]; ok {
mset.mu.Unlock()
if action == ActionCreate {
ocfg := eo.config()
copyConsumerMetadata(config, &ocfg)
if !reflect.DeepEqual(config, &ocfg) {
mset.mu.Unlock()
return nil, NewJSConsumerAlreadyExistsError()
}
}
@@ -1042,11 +1042,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if cfg.Retention == WorkQueuePolicy {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if !mset.partitionUnique(cName, subjects) {
mset.mu.Unlock()
return nil, NewJSConsumerWQConsumerNotUniqueError()
}
}
mset.mu.Unlock()
err := eo.updateConfig(config)
if err == nil {
return eo, nil
@@ -1544,6 +1542,7 @@ func (o *consumer) setLeader(isLeader bool) {
if o.cfg.AckPolicy != AckNone {
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
}
}
@@ -1552,6 +1551,7 @@ func (o *consumer) setLeader(isLeader bool) {
// Will error if wrong mode to provide feedback to users.
if o.reqSub, err = o.subscribeInternal(o.nextMsgSubj, o.processNextMsgReq); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
}
@@ -1561,6 +1561,7 @@ func (o *consumer) setLeader(isLeader bool) {
fcsubj := fmt.Sprintf(jsFlowControl, stream, o.name)
if o.fcSub, err = o.subscribeInternal(fcsubj, o.processFlowControl); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
}
}
@@ -2400,8 +2401,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// Check for Subject Filters update.
newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects)
updatedFilters := !subjectSliceEqual(newSubjects, o.subjf.subjects())
if updatedFilters {
if !subjectSliceEqual(newSubjects, o.subjf.subjects()) {
newSubjf := make(subjectFilters, 0, len(newSubjects))
for _, newFilter := range newSubjects {
fs := &subjectFilter{
@@ -2440,18 +2440,16 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// Allowed but considered no-op, [Description, SampleFrequency, MaxWaiting, HeadersOnly]
o.cfg = *cfg
if updatedFilters {
// Cleanup messages that lost interest.
if o.retention == InterestPolicy {
o.mu.Unlock()
o.cleanupNoInterestMessages(o.mset, false)
o.mu.Lock()
}
// Re-calculate num pending on update.
o.streamNumPending()
// Cleanup messages that lost interest.
if o.retention == InterestPolicy {
o.mu.Unlock()
o.cleanupNoInterestMessages(o.mset, false)
o.mu.Lock()
}
// Re-calculate num pending on update.
o.streamNumPending()
return nil
}
@@ -5117,14 +5115,9 @@ func (o *consumer) checkNumPending() (uint64, error) {
var state StreamState
o.mset.store.FastState(&state)
npc := o.numPending()
// Make sure we can't report more messages than there are.
// TODO(nat): It's not great that this means consumer info has side effects,
// since we can't know whether anyone will call it or not. The previous num
// pending calculation that this replaces had the same problem though.
if o.sseq > state.LastSeq {
o.npc = 0
} else if npc > 0 {
o.npc = int64(min(npc, state.Msgs, state.LastSeq-o.sseq+1))
if o.sseq > state.LastSeq && npc > 0 || npc > state.Msgs {
// Re-calculate.
return o.streamNumPending()
}
}
return o.numPending(), nil
@@ -5372,15 +5365,6 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
o.pending = make(map[uint64]*Pending)
}
now := time.Now()
if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
p.Timestamp = now.UnixNano()
} else {
o.pending[sseq] = &Pending{dseq, now.UnixNano()}
}
// We could have a backoff that set a timer higher than what we need for this message.
// In that case, reset to lowest backoff required for a message redelivery.
minDelay := o.ackWait(0)
@@ -5393,10 +5377,18 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
}
minDelay = o.ackWait(o.cfg.BackOff[bi])
}
minDeadline := now.Add(minDelay)
minDeadline := time.Now().Add(minDelay)
if o.ptmr == nil || o.ptmrEnd.After(minDeadline) {
o.resetPtmr(minDelay)
}
if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
p.Timestamp = time.Now().UnixNano()
} else {
o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()}
}
}
// Credit back a failed delivery.
@@ -6511,10 +6503,6 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
retryAsflr = seq
}
} else if seq <= dflr {
// Store the first entry above our ack floor, so we don't need to look it up again on retryAsflr=0.
if retryAsflr == 0 {
retryAsflr = seq
}
// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
if _, ok := state.Pending[seq]; !ok {
// The filters are already taken into account,
@@ -6526,18 +6514,8 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
}
}
// If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it.
// However, our ack floor may be lower than the next message we can receive, so we correct it upward if needed.
if retryAsflr == 0 {
if filters != nil {
_, nseq, err = store.LoadNextMsgMulti(filters, asflr+1, &smv)
} else {
_, nseq, err = store.LoadNextMsg(filter, wc, asflr+1, &smv)
}
if err == nil {
retryAsflr = max(asflr+1, nseq)
} else if err == ErrStoreEOF {
retryAsflr = ss.LastSeq + 1
}
retryAsflr = asflr + 1
}
o.mu.Lock()

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2026 The NATS Authors
// Copyright 2019-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -201,7 +201,7 @@ type fileStore struct {
sips int
dirty int
closing bool
closed atomic.Bool // Atomic to reduce contention on ConsumerStores.
closed bool
fip bool
receivedAny bool
firstMoved bool
@@ -473,18 +473,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
_, err = os.Stat(keyFile)
// Either the file should exist (err=nil), or it shouldn't. Any other error is reported.
if err != nil && !os.IsNotExist(err) {
return nil, err
}
// Make sure we do not have an encrypted store underneath of us but no main key.
if fs.prf == nil && err == nil {
return nil, errNoMainKey
} else if fs.prf != nil && err == nil {
// If encryption is configured and the key file exists, recover our keys.
if err = fs.recoverAEK(); err != nil {
return nil, err
if fs.prf == nil {
if _, err := os.Stat(keyFile); err == nil {
return nil, errNoMainKey
}
}
@@ -1792,14 +1784,16 @@ func (fs *fileStore) recoverFullState() (rerr error) {
}
// Decrypt if needed.
// We can be setup for encryption but if this is a snapshot restore we will be missing the keyfile
// since snapshots strip encryption.
if fs.prf != nil && fs.aek != nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil)
if err != nil {
fs.warn("Stream state error reading encryption key: %v", err)
return err
if fs.prf != nil {
// We can be setup for encryption but if this is a snapshot restore we will be missing the keyfile
// since snapshots strip encryption.
if err := fs.recoverAEK(); err == nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil)
if err != nil {
fs.warn("Stream state error reading encryption key: %v", err)
return err
}
}
}
@@ -2352,7 +2346,7 @@ func (fs *fileStore) recoverMsgs() error {
if fs.ld != nil {
var emptyBlks []*msgBlock
for _, mb := range fs.blks {
if mb.msgs == 0 && mb.rbytes == 0 && mb != fs.lmb {
if mb.msgs == 0 && mb.rbytes == 0 {
emptyBlks = append(emptyBlks, mb)
}
}
@@ -2594,7 +2588,7 @@ func copyMsgBlocks(src []*msgBlock) []*msgBlock {
func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
fs.mu.RLock()
lastSeq := fs.state.LastSeq
closed := fs.isClosed()
closed := fs.closed
fs.mu.RUnlock()
if closed {
@@ -2609,51 +2603,20 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
fseq := atomic.LoadUint64(&mb.first.seq)
lseq := atomic.LoadUint64(&mb.last.seq)
var (
smv StoreMsg
cts int64
cseq uint64
off uint64
)
var smv StoreMsg
ts := t.UnixNano()
// Using a binary search, but need to be aware of interior deletes in the block.
seq := lseq + 1
loop:
for fseq <= lseq {
mid := fseq + (lseq-fseq)/2
off = 0
// Potentially skip over gaps. We keep the original middle but keep track of a
// potential delete range with an offset.
for {
sm, _, err := mb.fetchMsgNoCopy(mid+off, &smv)
if err != nil || sm == nil {
off++
if mid+off <= lseq {
continue
} else {
// Continue search to the left. Purposely ignore the skipped deletes here.
lseq = mid - 1
continue loop
}
}
cts = sm.ts
cseq = sm.seq
break
}
if cts >= ts {
seq = cseq
if mid == fseq {
break
}
// Continue search to the left.
lseq = mid - 1
} else {
// Continue search to the right (potentially skipping over interior deletes).
fseq = mid + off + 1
}
// Because sort.Search expects range [0,off), we have to manually
// calculate the offset from the first sequence.
off := int(lseq - fseq + 1)
i := sort.Search(off, func(i int) bool {
sm, _, _ := mb.fetchMsgNoCopy(fseq+uint64(i), &smv)
return sm != nil && sm.ts >= ts
})
if i < off {
return fseq + uint64(i)
}
return seq
return 0
}
// Find the first matching message against a sublist.
@@ -2669,15 +2632,13 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
mb.mu.Unlock()
}()
if mb.fssNotLoaded() {
// Make sure we have fss loaded.
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil, false, err
}
didLoad = true
}
// Mark fss activity.
mb.lsts = ats.AccessTime()
// Make sure to start at mb.first.seq if fseq < mb.first.seq
if seq := atomic.LoadUint64(&mb.first.seq); seq > start {
@@ -2685,6 +2646,10 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
}
lseq := atomic.LoadUint64(&mb.last.seq)
if sm == nil {
sm = new(StoreMsg)
}
// If the FSS state has fewer entries than sequences in the linear scan,
// then use intersection instead as likely going to be cheaper. This will
// often be the case with high numbers of deletes, as well as a smaller
@@ -2692,11 +2657,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
if uint64(mb.fss.Size()) < lseq-start {
// If there are no subject matches then this is effectively no-op.
hseq := uint64(math.MaxUint64)
var ierr error
stree.IntersectGSL(mb.fss, sl, func(subj []byte, ss *SimpleState) {
if ierr != nil {
return
}
gsl.IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) {
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
// mb is already loaded into the cache so should be fast-ish.
mb.recalculateForSubj(bytesToString(subj), ss)
@@ -2708,16 +2669,6 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
// than our first seq for this subject.
return
}
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if ierr = mb.loadMsgsWithLock(); ierr != nil {
return
}
didLoad = true
}
if sm == nil {
sm = new(StoreMsg)
}
if first == ss.First {
// If the start floor is below where this subject starts then we can
// short-circuit, avoiding needing to scan for the next message.
@@ -2752,24 +2703,10 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
mb.llseq = llseq
}
})
if ierr != nil {
return nil, false, ierr
}
if hseq < uint64(math.MaxUint64) && sm != nil {
return sm, didLoad && start == lseq, nil
}
} else {
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil, false, err
}
didLoad = true
}
if sm == nil {
sm = new(StoreMsg)
}
for seq := start; seq <= lseq; seq++ {
if mb.dmap.Exists(seq) {
// Optimisation to avoid calling cacheLookup which hits time.Now().
@@ -2836,10 +2773,6 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
isAll = true
})
}
// If the only subject in this block isn't our filter, can simply short-circuit.
if !isAll {
return nil, didLoad, ErrStoreMsgNotFound
}
}
// Make sure to start at mb.first.seq if fseq < mb.first.seq
fseq = max(fseq, atomic.LoadUint64(&mb.first.seq))
@@ -2976,7 +2909,7 @@ func (mb *msgBlock) prevMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *S
if uint64(mb.fss.Size()) < start-lseq {
// If there are no subject matches then this is effectively no-op.
hseq := uint64(0)
stree.IntersectGSL(mb.fss, sl, func(subj []byte, ss *SimpleState) {
gsl.IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) {
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
// mb is already loaded into the cache so should be fast-ish.
mb.recalculateForSubj(bytesToString(subj), ss)
@@ -3255,30 +3188,6 @@ func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool, bi int) (int, e
if start == uint32(math.MaxUint32) {
return -1, ErrStoreEOF
}
return fs.selectSkipFirstBlock(bi, start, stop)
}
// This is used to see if we can selectively jump start blocks based on filter subjects and a starting block index.
// Will return -1 and ErrStoreEOF if no matches at all or no more from where we are.
func (fs *fileStore) checkSkipFirstBlockMulti(sl *gsl.SimpleSublist, bi int) (int, error) {
// Move through psim to gather start and stop bounds.
start, stop := uint32(math.MaxUint32), uint32(0)
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
})
// Nothing was found.
if start == uint32(math.MaxUint32) {
return -1, ErrStoreEOF
}
return fs.selectSkipFirstBlock(bi, start, stop)
}
func (fs *fileStore) selectSkipFirstBlock(bi int, start, stop uint32) (int, error) {
// Can not be nil so ok to inline dereference.
mbi := fs.blks[bi].getIndex()
// All matching msgs are behind us.
@@ -4097,7 +4006,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
mb := fs.blks[seqStart]
bi := mb.index
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) {
// If the select blk start is greater than entry's last blk skip.
if bi > psi.lblk {
return
@@ -4196,7 +4105,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
var t uint64
var havePartial bool
var updateLLTS bool
stree.IntersectGSL[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
gsl.IntersectStree[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
subj := bytesToString(bsubj)
if havePartial {
// If we already found a partial then don't do anything else.
@@ -4259,7 +4168,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
// If we are here it's better to calculate totals from psim and adjust downward by scanning less blocks.
start := uint32(math.MaxUint32)
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) {
total += psi.total
// Keep track of start index for this subject.
if psi.fblk < start {
@@ -4310,7 +4219,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
}
// Mark fss activity.
mb.lsts = ats.AccessTime()
stree.IntersectGSL(mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
gsl.IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
adjust += ss.Msgs
})
}
@@ -4577,7 +4486,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
// Stores a raw message with expected sequence number and timestamp.
// Lock should be held.
func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, ttl int64) (err error) {
if fs.isClosed() {
if fs.closed {
return ErrStoreClosed
}
@@ -5250,7 +5159,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
fsLock()
if fs.isClosed() {
if fs.closed {
fsUnlock()
return false, ErrStoreClosed
}
@@ -5460,8 +5369,14 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// If we have a callback registered we need to release lock regardless since cb might need it to lookup msg, etc.
fs.mu.Unlock()
// Storage updates.
delta := int64(msz)
cb(-1, -delta, seq, sm.subj)
if cb != nil {
var subj string
if sm != nil {
subj = sm.subj
}
delta := int64(msz)
cb(-1, -delta, seq, subj)
}
if !needFSLock {
fs.mu.Lock()
@@ -5696,7 +5611,10 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
}
func (fs *fileStore) isClosed() bool {
return fs.closed.Load()
fs.mu.RLock()
closed := fs.closed
fs.mu.RUnlock()
return closed
}
// Will spin up our flush loop.
@@ -7077,10 +6995,11 @@ func (mb *msgBlock) ensureRawBytesLoaded() error {
// Sync msg and index files as needed. This is called from a timer.
func (fs *fileStore) syncBlocks() {
if fs.isClosed() {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
return
}
fs.mu.Lock()
blks := append([]*msgBlock(nil), fs.blks...)
lmb, firstMoved, firstSeq := fs.lmb, fs.firstMoved, fs.state.FirstSeq
// Clear first moved.
@@ -7170,10 +7089,11 @@ func (fs *fileStore) syncBlocks() {
}
}
if fs.isClosed() {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
return
}
fs.mu.Lock()
fs.setSyncTimer()
if markDirty {
fs.dirty++
@@ -8019,14 +7939,17 @@ func (fs *fileStore) msgForSeq(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
// Will return message for the given sequence number.
func (fs *fileStore) msgForSeqLocked(seq uint64, sm *StoreMsg, needFSLock bool) (*StoreMsg, error) {
if fs.isClosed() {
return nil, ErrStoreClosed
}
// TODO(dlc) - Since Store, Remove, Skip all hold the write lock on fs this will
// be stalled. Need another lock if want to happen in parallel.
if needFSLock {
fs.mu.RLock()
}
if fs.closed {
if needFSLock {
fs.mu.RUnlock()
}
return nil, ErrStoreClosed
}
// Indicates we want first msg.
if seq == 0 {
seq = fs.state.FirstSeq
@@ -8205,14 +8128,10 @@ func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
// loadLast will load the last message for a subject. Subject should be non empty and not ">".
func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err error) {
if fs.isClosed() {
return nil, ErrStoreClosed
}
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.lmb == nil {
if fs.closed || fs.lmb == nil {
return nil, ErrStoreClosed
}
@@ -8309,15 +8228,15 @@ func (fs *fileStore) LoadLastMsg(subject string, smv *StoreMsg) (sm *StoreMsg, e
// LoadNextMsgMulti will find the next message matching any entry in the sublist.
func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
if fs.isClosed() {
return nil, 0, ErrStoreClosed
}
if sl == nil {
return fs.LoadNextMsg(_EMPTY_, false, start, smp)
}
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.closed {
return nil, 0, ErrStoreClosed
}
if fs.state.Msgs == 0 || start > fs.state.LastSeq {
return nil, fs.state.LastSeq, ErrStoreEOF
}
@@ -8325,31 +8244,6 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
start = fs.state.FirstSeq
}
// If start is less than or equal to beginning of our stream, meaning our first call,
// let's check the psim to see if we can skip ahead.
if start <= fs.state.FirstSeq {
var total uint64
blkStart := uint32(math.MaxUint32)
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
total += psi.total
// Keep track of start index for this subject.
if psi.fblk < blkStart {
blkStart = psi.fblk
}
})
// Nothing available.
if total == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// We can skip ahead.
if mb := fs.bim[blkStart]; mb != nil {
fseq := atomic.LoadUint64(&mb.first.seq)
if fseq > start {
start = fseq
}
}
}
if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
for i := bi; i < len(fs.blks); i++ {
mb := fs.blks[i]
@@ -8360,28 +8254,8 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
} else {
// Nothing found in this block. We missed, if first block (bi) check psim.
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
// We should not do this at all if we are already on the last block.
if i == bi && i < len(fs.blks)-1 {
nbi, err := fs.checkSkipFirstBlockMulti(sl, bi)
// Nothing available.
if err == ErrStoreEOF {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi > i {
i = nbi - 1 // For the iterator condition i++
}
}
// Check if we can expire.
if expireOk {
mb.tryForceExpireCache()
}
} else if expireOk {
mb.tryForceExpireCache()
}
}
}
@@ -8391,13 +8265,12 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
}
func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, uint64, error) {
if fs.isClosed() {
return nil, 0, ErrStoreClosed
}
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.closed {
return nil, 0, ErrStoreClosed
}
if fs.state.Msgs == 0 || start > fs.state.LastSeq {
return nil, fs.state.LastSeq, ErrStoreEOF
}
@@ -8450,7 +8323,7 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
i = nbi - 1 // For the iterator condition i++
}
}
// Check if we can expire.
// Check is we can expire.
if expireOk {
mb.tryForceExpireCache()
}
@@ -8463,13 +8336,12 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Will load the next non-deleted msg starting at the start sequence and walking backwards.
func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) {
if fs.isClosed() {
return nil, ErrStoreClosed
}
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.closed {
return nil, ErrStoreClosed
}
if fs.state.Msgs == 0 || start < fs.state.FirstSeq {
return nil, ErrStoreEOF
}
@@ -8517,10 +8389,6 @@ func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err
// LoadPrevMsgMulti will find the previous message matching any entry in the sublist.
func (fs *fileStore) LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
if fs.isClosed() {
return nil, 0, ErrStoreClosed
}
if sl == nil {
sm, err = fs.LoadPrevMsg(start, smp)
return
@@ -8528,6 +8396,9 @@ func (fs *fileStore) LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.closed {
return nil, 0, ErrStoreClosed
}
if fs.state.Msgs == 0 || start < fs.state.FirstSeq {
return nil, fs.state.FirstSeq, ErrStoreEOF
}
@@ -9081,12 +8952,12 @@ func (fs *fileStore) Purge() (uint64, error) {
}
func (fs *fileStore) purge(fseq uint64) (uint64, error) {
if fs.isClosed() {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
return 0, ErrStoreClosed
}
fs.mu.Lock()
purged := fs.state.Msgs
rbytes := int64(fs.state.Bytes)
@@ -9194,11 +9065,6 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) {
fs.mu.Unlock()
return fs.purge(seq)
}
// Short-circuit if the store was already compacted past this point.
if fs.state.FirstSeq > seq {
fs.mu.Unlock()
return purged, nil
}
// We have to delete interior messages.
smb := fs.selectMsgBlock(seq)
if smb == nil {
@@ -9246,12 +9112,7 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) {
if err = smb.loadMsgsWithLock(); err != nil {
goto SKIP
}
defer func() {
// The lock is released once we get here, so need to re-acquire.
smb.mu.Lock()
smb.finishedWithCache()
smb.mu.Unlock()
}()
defer smb.finishedWithCache()
}
for mseq := atomic.LoadUint64(&smb.first.seq); mseq < seq; mseq++ {
sm, err := smb.cacheLookupNoCopy(mseq, &smv)
@@ -9429,12 +9290,12 @@ SKIP:
// Will completely reset our store.
func (fs *fileStore) reset() error {
if fs.isClosed() {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
return ErrStoreClosed
}
fs.mu.Lock()
var purged, bytes uint64
cb := fs.scb
@@ -9520,10 +9381,6 @@ func (mb *msgBlock) tombsLocked() []msgId {
// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
func (fs *fileStore) Truncate(seq uint64) error {
if fs.isClosed() {
return ErrStoreClosed
}
// Check for request to reset.
if seq == 0 {
return fs.reset()
@@ -9531,6 +9388,11 @@ func (fs *fileStore) Truncate(seq uint64) error {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
return ErrStoreClosed
}
// Any existing state file will no longer be applicable. We will force write a new one
// at the end, after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
@@ -9820,14 +9682,6 @@ func (fs *fileStore) purgeMsgBlock(mb *msgBlock) {
mb.finishedWithCache()
mb.mu.Unlock()
fs.selectNextFirst()
if cb := fs.scb; cb != nil {
// If we have a callback registered, we need to release lock regardless since consumers will recalculate pending.
fs.mu.Unlock()
// Storage updates.
cb(-int64(msgs), -int64(bytes), 0, _EMPTY_)
fs.mu.Lock()
}
}
// Called by purge to simply get rid of the cache and close our fds.
@@ -10039,9 +9893,6 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
func (fs *fileStore) resetGlobalPerSubjectInfo() {
// Clear any global subject state.
fs.psim, fs.tsl = fs.psim.Empty(), 0
if fs.noTrackSubjects() {
return
}
for _, mb := range fs.blks {
fs.populateGlobalPerSubjectInfo(mb)
}
@@ -10379,10 +10230,6 @@ func (fs *fileStore) forceWriteFullStateLocked() error {
// 3. MBs - Index, Bytes, First and Last Sequence and Timestamps, and the deleted map (avl.seqset).
// 4. Last block index and hash of record inclusive to this stream state.
func (fs *fileStore) _writeFullState(force, needLock bool) error {
if fs.isClosed() {
return nil
}
fsLock := func() {
if needLock {
fs.mu.Lock()
@@ -10396,7 +10243,7 @@ func (fs *fileStore) _writeFullState(force, needLock bool) error {
start := time.Now()
fsLock()
if fs.dirty == 0 {
if fs.closed || fs.dirty == 0 {
fsUnlock()
return nil
}
@@ -10647,12 +10494,8 @@ func (fs *fileStore) Stop() error {
// Stop the current filestore.
func (fs *fileStore) stop(delete, writeState bool) error {
if fs.isClosed() {
return ErrStoreClosed
}
fs.mu.Lock()
if fs.closing {
if fs.closed || fs.closing {
fs.mu.Unlock()
return ErrStoreClosed
}
@@ -10688,7 +10531,7 @@ func (fs *fileStore) stop(delete, writeState bool) error {
// Mark as closed. Last message block needs to be cleared after
// writeFullState has completed.
fs.closed.Store(true)
fs.closed = true
fs.lmb = nil
// We should update the upper usage layer on a stop.
@@ -10907,12 +10750,11 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool, err
// Create a snapshot of this stream and its consumer's state along with messages.
func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumers bool) (*SnapshotResult, error) {
if fs.isClosed() {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
return nil, ErrStoreClosed
}
fs.mu.Lock()
// Only allow one at a time.
if fs.sips > 0 {
fs.mu.Unlock()
@@ -11232,9 +11074,7 @@ func (fs *fileStore) ConsumerStore(name string, created time.Time, cfg *Consumer
go o.flushLoop(o.fch, o.qch)
// Make sure to load in our state from disk if needed.
if err = o.loadState(); err != nil {
return nil, err
}
o.loadState()
// Assign to filestore.
fs.AddConsumer(o)
@@ -11924,15 +11764,10 @@ func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, er
}
// Lock should be held. Called at startup.
func (o *consumerFileStore) loadState() error {
func (o *consumerFileStore) loadState() {
if _, err := os.Stat(o.ifn); err == nil {
// This will load our state in from disk.
_, err = o.stateWithCopyLocked(false)
return err
} else if os.IsNotExist(err) {
return nil
} else {
return err
o.stateWithCopyLocked(false)
}
}

View File

@@ -17,6 +17,9 @@ import (
"errors"
"strings"
"sync"
"unsafe"
"github.com/nats-io/nats-server/v2/server/stree"
)
// Sublist is a routing mechanism to handle subject distribution and
@@ -248,9 +251,7 @@ func matchLevelForAny[T comparable](l *level[T], toks []string, np *int) bool {
if np != nil {
*np += len(n.subs)
}
if len(n.subs) > 0 {
return true
}
return len(n.subs) > 0
}
if pwc != nil {
if np != nil {
@@ -369,36 +370,6 @@ func (s *GenericSublist[T]) Remove(subject string, value T) error {
return s.remove(subject, value, true)
}
// HasInterestStartingIn is a helper for subject tree intersection.
func (s *GenericSublist[T]) HasInterestStartingIn(subj string) bool {
s.RLock()
defer s.RUnlock()
var _tokens [64]string
tokens := tokenizeSubjectIntoSlice(_tokens[:0], subj)
return hasInterestStartingIn(s.root, tokens)
}
func hasInterestStartingIn[T comparable](l *level[T], tokens []string) bool {
if l == nil {
return false
}
if len(tokens) == 0 {
return true
}
token := tokens[0]
if l.fwc != nil {
return true
}
found := false
if pwc := l.pwc; pwc != nil {
found = found || hasInterestStartingIn(pwc.next, tokens[1:])
}
if n := l.nodes[token]; n != nil {
found = found || hasInterestStartingIn(n.next, tokens[1:])
}
return found
}
// pruneNode is used to prune an empty node from the tree.
func (l *level[T]) pruneNode(n *node[T], t string) {
if n == nil {
@@ -492,15 +463,86 @@ func visitLevel[T comparable](l *level[T], depth int) int {
return maxDepth
}
// use similar to append. meaning, the updated slice will be returned
func tokenizeSubjectIntoSlice(tts []string, subject string) []string {
start := 0
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tts = append(tts, subject[start:i])
start = i + 1
// IntersectStree will match all items in the given subject tree that
// have interest expressed in the given sublist. The callback will only be called
// once for each subject, regardless of overlapping subscriptions in the sublist.
func IntersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], sl *GenericSublist[T2], cb func(subj []byte, entry *T1)) {
var _subj [255]byte
intersectStree(st, sl.root, _subj[:0], cb)
}
func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T2], subj []byte, cb func(subj []byte, entry *T1)) {
nsubj := subj
if len(nsubj) > 0 {
nsubj = append(subj, '.')
}
if r.fwc != nil {
// We've reached a full wildcard, do a FWC match on the stree at this point
// and don't keep iterating downward.
nsubj := append(nsubj, '>')
st.Match(nsubj, cb)
return
}
if r.pwc != nil {
// We've found a partial wildcard. We'll keep iterating downwards, but first
// check whether there's interest at this level (without triggering dupes) and
// match if so.
var done bool
nsubj := append(nsubj, '*')
if len(r.pwc.subs) > 0 {
st.Match(nsubj, cb)
done = true
}
if r.pwc.next.numNodes() > 0 {
intersectStree(st, r.pwc.next, nsubj, cb)
}
if done {
return
}
}
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
// A wildcard at the next level will already visit these descendents
// so skip so we don't callback the same subject more than once.
continue
}
nsubj := append(nsubj, t...)
if len(n.subs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
}
}
if n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
tts = append(tts, subject[start:])
return tts
}
// Determine if a subject has any wildcard tokens.
func subjectHasWildcard(subject string) bool {
// This one exits earlier then !subjectIsLiteral(subject)
for i, c := range subject {
if c == pwc || c == fwc {
if (i == 0 || subject[i-1] == btsep) &&
(i+1 == len(subject) || subject[i+1] == btsep) {
return true
}
}
}
return false
}
// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer.
// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same.
func bytesToString(b []byte) string {
if len(b) == 0 {
return _EMPTY_
}
p := unsafe.SliceData(b)
return unsafe.String(p, len(b))
}

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2026 The NATS Authors
// Copyright 2019-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1142,12 +1142,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
}
js.mu.Lock()
// Accounts get reset to nil on shutdown, since we re-acquire the locks here, we need to check again.
if js.accounts == nil {
js.mu.Unlock()
return NewJSNotEnabledError()
}
if jsa, ok := js.accounts[a.Name]; ok {
a.mu.Lock()
a.js = jsa
@@ -1376,7 +1370,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
}
obs, err := mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false)
if err != nil {
s.Warnf(" Error adding consumer '%s > %s > %s': %v", a.Name, mset.name(), cfg.Name, err)
s.Warnf(" Error adding consumer %q: %v", cfg.Name, err)
continue
}
if isEphemeral {
@@ -1385,6 +1379,9 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
if !cfg.Created.IsZero() {
obs.setCreatedTime(cfg.Created)
}
if err != nil {
s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err)
}
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2026 The NATS Authors
// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1296,6 +1296,11 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1314,12 +1319,6 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if !doErr {
return
@@ -1613,6 +1612,11 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1631,12 +1635,6 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -1731,6 +1729,11 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1749,12 +1752,6 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -1835,6 +1832,11 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1853,12 +1855,6 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -1971,6 +1967,11 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
Streams: []*StreamInfo{},
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1989,12 +1990,6 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -2095,6 +2090,11 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
resp.ApiResponse.Type = JSApiStreamCreateResponseType
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var clusterWideConsCount int
@@ -2184,12 +2184,6 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -2315,6 +2309,11 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *
name := tokenAt(subject, 6)
var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are not in clustered mode this is a failed request.
if !s.JetStreamIsClustered() {
@@ -2346,12 +2345,6 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *
return
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -2428,6 +2421,11 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _
}
var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are not in clustered mode this is a failed request.
if !s.JetStreamIsClustered() {
@@ -2462,13 +2460,6 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _
} else if sa == nil {
return
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var ca *consumerAssignment
if sa.consumers != nil {
ca = sa.consumers[consumer]
@@ -2556,6 +2547,11 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco
name := tokenAt(subject, 6)
var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are not in clustered mode this is a failed request.
if !s.JetStreamIsClustered() {
@@ -2584,12 +2580,6 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco
return
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3082,6 +3072,11 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
accName := tokenAt(subject, 5)
var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !s.JetStreamIsClustered() {
var streams []*stream
@@ -3122,12 +3117,6 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
return
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if js.isMetaRecovering() {
// While in recovery mode, the data structures are not fully initialized
resp.Error = NewJSClusterNotAvailError()
@@ -3350,6 +3339,11 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account,
}
var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -3368,12 +3362,6 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3426,6 +3414,11 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su
stream := tokenAt(subject, 6)
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
@@ -3474,12 +3467,6 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3551,6 +3538,11 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
stream := tokenAt(subject, 6)
var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
@@ -3599,12 +3591,6 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3709,8 +3695,31 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account
stream := streamNameFromSubject(subject)
consumer := consumerNameFromSubject(subject)
var req JSApiConsumerUnpinRequest
var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if req.Group == _EMPTY_ {
resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !validGroupName.MatchString(req.Group) {
resp.Error = NewJSConsumerInvalidGroupNameError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if s.JetStreamIsClustered() {
// Check to make sure the stream is assigned.
js, cc := s.getJetStreamCluster()
@@ -3762,31 +3771,6 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req JSApiConsumerUnpinRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if req.Group == _EMPTY_ {
resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !validGroupName.MatchString(req.Group) {
resp.Error = NewJSConsumerInvalidGroupNameError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -3850,6 +3834,11 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
stream := streamNameFromSubject(subject)
var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
@@ -3901,12 +3890,6 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -4524,6 +4507,11 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
}
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req CreateConsumerRequest
if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
@@ -4560,20 +4548,6 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
var streamName, consumerName, filteredSubject string
var rt ccReqType
@@ -4606,6 +4580,14 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
}
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
if streamName != req.Stream {
resp.Error = NewJSStreamMismatchError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -4750,6 +4732,11 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
Consumers: []string{},
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -4768,12 +4755,6 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -4878,6 +4859,11 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
Consumers: []*ConsumerInfo{},
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -4897,7 +4883,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
resp.Error = NewJSClusterNotAvailError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -4987,6 +4973,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
consumerName := consumerNameFromSubject(subject)
var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = NewJSNotEmptyRequestError()
@@ -5137,12 +5128,6 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !acc.JetStreamEnabled() {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -5190,6 +5175,11 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun
}
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -5208,12 +5198,6 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
@@ -5269,6 +5253,11 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
var req JSApiConsumerPauseRequest
var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if isJSONObjectOrArray(msg) {
if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
@@ -5296,12 +5285,6 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
}
}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()

View File

@@ -3142,20 +3142,20 @@ func (mset *stream) resetClusteredState(err error) bool {
stype, tierName, replicas := mset.cfg.Storage, mset.tier, mset.cfg.Replicas
mset.mu.RUnlock()
// The stream might already be deleted and not assigned to us anymore.
// In any case, don't revive the stream if it's already closed.
if mset.closed.Load() || (node != nil && node.IsDeleted()) {
s.Warnf("Will not reset stream '%s > %s', stream is closed", acc, mset.name())
// Explicitly returning true here, we want the outside to break out of the monitoring loop as well.
return true
}
assert.Unreachable("Reset clustered state", map[string]any{
"stream": name,
"account": acc.Name,
"err": err,
})
// The stream might already be deleted and not assigned to us anymore.
// In any case, don't revive the stream if it's already closed.
if mset.closed.Load() {
s.Warnf("Will not reset stream '%s > %s', stream is closed", acc, mset.name())
// Explicitly returning true here, we want the outside to break out of the monitoring loop as well.
return true
}
// Stepdown regardless if we are the leader here.
if node != nil {
node.StepDown()

View File

@@ -19,6 +19,7 @@ import (
"fmt"
"math"
"slices"
"sort"
"sync"
"time"
@@ -444,6 +445,7 @@ func (ms *memStore) RegisterProcessJetStreamMsg(cb ProcessJetStreamMsgHandler) {
// GetSeqFromTime looks for the first sequence number that has the message
// with >= timestamp.
// FIXME(dlc) - inefficient.
func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
ts := t.UnixNano()
ms.mu.RLock()
@@ -467,57 +469,18 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
last := lmsg.ts
if ts == last {
return lmsg.seq
return ms.state.LastSeq
}
if ts > last {
return ms.state.LastSeq + 1
}
var (
cts int64
cseq uint64
off uint64
)
// Using a binary search, but need to be aware of interior deletes.
fseq := ms.state.FirstSeq
lseq := ms.state.LastSeq
seq := lseq + 1
loop:
for fseq <= lseq {
mid := fseq + (lseq-fseq)/2
off = 0
// Potentially skip over gaps. We keep the original middle but keep track of a
// potential delete range with an offset.
for {
msg := ms.msgs[mid+off]
if msg == nil {
off++
if mid+off <= lseq {
continue
} else {
// Continue search to the left. Purposely ignore the skipped deletes here.
lseq = mid - 1
continue loop
}
}
cts = msg.ts
cseq = msg.seq
break
index := sort.Search(len(ms.msgs), func(i int) bool {
if msg := ms.msgs[ms.state.FirstSeq+uint64(i)]; msg != nil {
return msg.ts >= ts
}
if cts >= ts {
seq = cseq
if mid == fseq {
break
}
// Continue search to the left.
lseq = mid - 1
} else {
// Continue search to the right (potentially skipping over interior deletes).
fseq = mid + off + 1
}
}
return seq
return false
})
return uint64(index) + ms.state.FirstSeq
}
// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
@@ -943,7 +906,7 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPerS
var havePartial bool
var totalSkipped uint64
// We will track start and end sequences as we go.
stree.IntersectGSL[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
gsl.IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
@@ -1500,12 +1463,6 @@ func (ms *memStore) compact(seq uint64) (uint64, error) {
var purged, bytes uint64
ms.mu.Lock()
// Short-circuit if the store was already compacted past this point.
if ms.state.FirstSeq > seq {
ms.mu.Unlock()
return purged, nil
}
cb := ms.scb
if seq <= ms.state.LastSeq {
fseq := ms.state.FirstSeq

View File

@@ -1,4 +1,4 @@
// Copyright 2013-2026 The NATS Authors
// Copyright 2013-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1279,7 +1279,6 @@ type Varz struct {
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` // SlowConsumersStats are statistics about all detected Slow Consumer
StaleConnectionStats *StaleConnectionStats `json:"stale_connection_stats,omitempty"` // StaleConnectionStats are statistics about all detected Stale Connections
Proxies *ProxiesOptsVarz `json:"proxies,omitempty"` // Proxies hold information about network proxy devices
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate of this server
}
// JetStreamVarz contains basic runtime information about jetstream
@@ -1292,36 +1291,34 @@ type JetStreamVarz struct {
// ClusterOptsVarz contains monitoring cluster information
type ClusterOptsVarz struct {
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections
Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections
Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
}
// GatewayOptsVarz contains monitoring gateway information
type GatewayOptsVarz struct {
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections
Port int `json:"port,omitempty"` // Port is the post gateway connections listens on
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients
ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make
Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes
RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificaet
Name string `json:"name,omitempty"` // Name is the configured cluster name
Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections
Port int `json:"port,omitempty"` // Port is the post gateway connections listens on
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients
ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make
Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes
RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
}
// RemoteGatewayOptsVarz contains monitoring remote gateway information
@@ -1343,7 +1340,6 @@ type LeafNodeOptsVarz struct {
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
}
// DenyRules Contains lists of subjects not allowed to be imported/exported
@@ -1374,7 +1370,6 @@ type MQTTOptsVarz struct {
AckWait time.Duration `json:"ack_wait,omitempty"` // AckWait is how long the internal JetStream state store will allow acks to complete
MaxAckPending uint16 `json:"max_ack_pending,omitempty"` // MaxAckPending is how many outstanding acks the internal JetStream state store will allow
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
}
// WebsocketOptsVarz contains monitoring websocket information
@@ -1393,7 +1388,6 @@ type WebsocketOptsVarz struct {
AllowedOrigins []string `json:"allowed_origins,omitempty"` // AllowedOrigins list of configured trusted origins
Compression bool `json:"compression,omitempty"` // Compression indicates if compression is supported
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
}
// OCSPResponseCacheVarz contains OCSP response cache information
@@ -1460,22 +1454,6 @@ func myUptime(d time.Duration) string {
return fmt.Sprintf("%ds", tsecs)
}
func tlsCertNotAfter(config *tls.Config) time.Time {
if config == nil || len(config.Certificates) == 0 {
return time.Time{}
}
cert := config.Certificates[0]
leaf := cert.Leaf
if leaf == nil {
var err error
leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
return time.Time{}
}
}
return leaf.NotAfter
}
// HandleRoot will show basic info and links to others handlers.
func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
// This feels dumb to me, but is required: https://code.google.com/p/go/issues/detail?id=4799
@@ -1801,13 +1779,6 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
v.TLSOCSPPeerVerify = s.ocspPeerVerify && v.TLSRequired && s.opts.tlsConfigOpts != nil && s.opts.tlsConfigOpts.OCSPPeerConfig != nil && s.opts.tlsConfigOpts.OCSPPeerConfig.Verify
v.TLSCertNotAfter = tlsCertNotAfter(opts.TLSConfig)
v.Cluster.TLSCertNotAfter = tlsCertNotAfter(opts.Cluster.TLSConfig)
v.Gateway.TLSCertNotAfter = tlsCertNotAfter(opts.Gateway.TLSConfig)
v.LeafNode.TLSCertNotAfter = tlsCertNotAfter(opts.LeafNode.TLSConfig)
v.MQTT.TLSCertNotAfter = tlsCertNotAfter(opts.MQTT.TLSConfig)
v.Websocket.TLSCertNotAfter = tlsCertNotAfter(opts.Websocket.TLSConfig)
if opts.Proxies != nil {
if v.Proxies == nil {
v.Proxies = &ProxiesOptsVarz{}
@@ -4011,11 +3982,6 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
return health
}
// Healthz returns the health status of the server.
func (s *Server) Healthz(opts *HealthzOptions) *HealthStatus {
return s.healthz(opts)
}
type ExpvarzStatus struct {
Memstats json.RawMessage `json:"memstats"`
Cmdline json.RawMessage `json:"cmdline"`

View File

@@ -2326,7 +2326,7 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error {
case "cluster_traffic":
vv, ok := mv.(string)
if !ok {
return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'owner' string value for %q, got %v", mk, mv)}
return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'account' string value for %q, got %v", mk, mv)}
}
switch vv {
case "system", _EMPTY_:

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2026 The NATS Authors
// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -83,7 +83,6 @@ type RaftNode interface {
Stop()
WaitForStop()
Delete()
IsDeleted() bool
RecreateInternalSubs() error
IsSystemAccount() bool
GetTrafficAccountName() string
@@ -232,7 +231,6 @@ type raft struct {
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
membChanging bool // There is a membership change proposal in progress
deleted bool // If the node was deleted.
}
type proposedEntry struct {
@@ -1737,6 +1735,11 @@ func (n *raft) StepDown(preferred ...string) error {
}
}
}
// Clear our vote state.
n.vote = noVote
n.writeTermVote()
n.Unlock()
if len(preferred) > 0 && maybeLeader == noLeader {
@@ -1917,7 +1920,6 @@ func (n *raft) Delete() {
n.Lock()
defer n.Unlock()
n.deleted = true
if wal := n.wal; wal != nil {
wal.Delete(false)
}
@@ -1925,12 +1927,6 @@ func (n *raft) Delete() {
n.debug("Deleted")
}
func (n *raft) IsDeleted() bool {
n.RLock()
defer n.RUnlock()
return n.deleted
}
func (n *raft) shutdown() {
// First call to Stop or Delete should close the quit chan
// to notify the runAs goroutines to stop what they're doing.
@@ -3405,9 +3401,9 @@ func (n *raft) runAsCandidate() {
n.requestVote()
// We vote for ourselves.
n.votes.push(&voteResponse{term: n.term, peer: n.ID(), granted: true})
votes := map[string]struct{}{}
votes := map[string]struct{}{
n.ID(): {},
}
emptyVotes := map[string]struct{}{}
for n.State() == Candidate {
@@ -3972,6 +3968,10 @@ CONTINUE:
// Here we can become a leader but need to wait for resume of the apply queue.
n.lxfer = true
}
} else if n.vote != noVote {
// Since we are here we are not the chosen one but we should clear any vote preference.
n.vote = noVote
n.writeTermVote()
}
}
case EntryAddPeer:
@@ -4211,9 +4211,6 @@ func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) error {
if !shouldStore {
ae.returnToPool()
}
if n.csz == 1 {
n.tryCommit(n.pindex)
}
return nil
}

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2026 The NATS Authors
// Copyright 2019-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -425,7 +425,6 @@ type stream struct {
active bool // Indicates that there are active internal subscriptions (for the subject filters)
// and/or mirror/sources consumers are scheduled to be established or already started.
closed atomic.Bool // Set to true when stop() is called on the stream.
cisrun atomic.Bool // Indicates one checkInterestState is already running.
// Mirror
mirror *sourceInfo
@@ -838,9 +837,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
// Add created timestamp used for the store, must match that of the stream assignment if it exists.
if sa != nil {
// The following assignment does not require mutex
// protection: sa.Created is immutable.
js.mu.RLock()
mset.created = sa.Created
js.mu.RUnlock()
}
// Start our signaling routine to process consumers.
@@ -1822,7 +1821,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo
// check for duplicates
var iNames = make(map[string]struct{})
for _, src := range cfg.Sources {
if src == nil || !isValidName(src.Name) {
if !isValidName(src.Name) {
return StreamConfig{}, NewJSSourceInvalidStreamNameError()
}
if _, ok := iNames[src.composeIName()]; !ok {
@@ -3163,6 +3162,7 @@ func (mset *stream) setupMirrorConsumer() error {
}
mirror := mset.mirror
mirrorWg := &mirror.wg
// We want to throttle here in terms of how fast we request new consumers,
// or if the previous is still in progress.
@@ -3321,16 +3321,7 @@ func (mset *stream) setupMirrorConsumer() error {
// Wait for previous processMirrorMsgs go routine to be completely done.
// If none is running, this will not block.
mset.mu.Lock()
if mset.mirror == nil {
// Mirror config has been removed.
mset.mu.Unlock()
return
} else {
wg := &mset.mirror.wg
mset.mu.Unlock()
wg.Wait()
}
mirrorWg.Wait()
select {
case ccr := <-respCh:
@@ -6054,6 +6045,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
return nil
}
// If here we will attempt to store the message.
// Assume this will succeed.
olmsgId := mset.lmsgId
mset.lmsgId = msgId
mset.lseq++
tierName := mset.tier
// Republish state if needed.
var tsubj string
var tlseq uint64
@@ -6077,7 +6075,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// If clustered this was already checked and we do not want to check here and possibly introduce skew.
// Don't error and log if we're tracing when clustered.
if !isClustered {
if exceeded, err := jsa.wouldExceedLimits(stype, mset.tier, mset.cfg.Replicas, subject, hdr, msg); exceeded {
if exceeded, err := jsa.wouldExceedLimits(stype, tierName, mset.cfg.Replicas, subject, hdr, msg); exceeded {
if err == nil {
err = NewJSAccountResourcesExceededError()
}
@@ -6136,7 +6134,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err)
return err
}
// If we did not succeed increment clfs in case we are clustered.
// If we did not succeed put those values back and increment clfs in case we are clustered.
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
mset.lmsgId = olmsgId
bumpCLFS()
switch err {
@@ -6157,8 +6159,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
// If here we succeeded in storing the message.
mset.lmsgId = msgId
mset.lseq = seq
// If we have a msgId make sure to save.
// This will replace our estimate from the cluster layer if we are clustered.
@@ -7298,30 +7298,11 @@ func (mset *stream) checkInterestState() {
return
}
// Ensure only one of these runs at the same time.
if !mset.cisrun.CompareAndSwap(false, true) {
return
}
defer mset.cisrun.Store(false)
var ss StreamState
mset.store.FastState(&ss)
asflr := uint64(math.MaxUint64)
for _, o := range mset.getConsumers() {
o.checkStateForInterestStream(&ss)
o.mu.RLock()
chkflr := o.chkflr
o.mu.RUnlock()
asflr = min(asflr, chkflr)
}
mset.cfgMu.RLock()
rp := mset.cfg.Retention
mset.cfgMu.RUnlock()
// Remove as many messages from the "head" of the stream if there's no interest anymore.
if rp == InterestPolicy && asflr != math.MaxUint64 {
mset.store.Compact(asflr)
}
}
@@ -7408,18 +7389,20 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) {
o.sigSubs = nil
}
if mset.csl == nil {
mset.csl = gsl.NewSublist[*consumer]()
}
// If no filters are present, add fwcs to sublist for that consumer.
if newFilters == nil {
mset.csl.Insert(fwcs, o)
o.sigSubs = append(o.sigSubs, fwcs)
} else {
// If there are filters, add their subjects to sublist.
for _, filter := range newFilters {
mset.csl.Insert(filter, o)
o.sigSubs = append(o.sigSubs, filter)
if o.isLeader() {
if mset.csl == nil {
mset.csl = gsl.NewSublist[*consumer]()
}
// If no filters are preset, add fwcs to sublist for that consumer.
if newFilters == nil {
mset.csl.Insert(fwcs, o)
o.sigSubs = append(o.sigSubs, fwcs)
// If there are filters, add their subjects to sublist.
} else {
for _, filter := range newFilters {
mset.csl.Insert(filter, o)
o.sigSubs = append(o.sigSubs, filter)
}
}
}
o.mu.Unlock()
@@ -7496,18 +7479,14 @@ func (mset *stream) partitionUnique(name string, partitions []string) bool {
if n == name {
continue
}
o.mu.RLock()
if o.subjf == nil {
o.mu.RUnlock()
return false
}
for _, filter := range o.subjf {
if SubjectsCollide(partition, filter.subject) {
o.mu.RUnlock()
return false
}
}
o.mu.RUnlock()
}
}
return true

View File

@@ -16,9 +16,6 @@ package stree
import (
"bytes"
"slices"
"unsafe"
"github.com/nats-io/nats-server/v2/server/gsl"
)
// SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects.
@@ -451,60 +448,3 @@ func LazyIntersect[TL, TR any](tl *SubjectTree[TL], tr *SubjectTree[TR], cb func
})
}
}
// IntersectGSL will match all items in the given subject tree that
// have interest expressed in the given sublist. The callback will only be called
// once for each subject, regardless of overlapping subscriptions in the sublist.
func IntersectGSL[T any, SL comparable](t *SubjectTree[T], sl *gsl.GenericSublist[SL], cb func(subject []byte, val *T)) {
if t == nil || t.root == nil || sl == nil {
return
}
var _pre [256]byte
_intersectGSL(t.root, _pre[:0], sl, cb)
}
func _intersectGSL[T any, SL comparable](n node, pre []byte, sl *gsl.GenericSublist[SL], cb func(subject []byte, val *T)) {
if n.isLeaf() {
ln := n.(*leaf[T])
subj := append(pre, ln.suffix...)
if sl.HasInterest(bytesToString(subj)) {
cb(subj, &ln.value)
}
return
}
bn := n.base()
pre = append(pre, bn.prefix...)
for _, cn := range n.children() {
if cn == nil {
continue
}
subj := append(pre, cn.path()...)
if !hasInterestForTokens(sl, subj, len(pre)) {
continue
}
_intersectGSL(cn, pre, sl, cb)
}
}
// The subject tree can return partial tokens so we need to check starting interest
// only from whole tokens when we encounter a tsep.
func hasInterestForTokens[SL comparable](sl *gsl.GenericSublist[SL], subj []byte, since int) bool {
for i := since; i < len(subj); i++ {
if subj[i] == tsep {
if !sl.HasInterestStartingIn(bytesToString(subj[:i])) {
return false
}
}
}
return true
}
// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer.
// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same.
func bytesToString(b []byte) string {
if len(b) == 0 {
return ""
}
p := unsafe.SliceData(b)
return unsafe.String(p, len(b))
}

View File

@@ -21,6 +21,8 @@ import (
"sync"
"sync/atomic"
"unicode/utf8"
"github.com/nats-io/nats-server/v2/server/stree"
)
// Sublist is a routing mechanism to handle subject distribution and
@@ -816,9 +818,7 @@ func matchLevelForAny(l *level, toks []string, np, nq *int) bool {
*nq += len(qsub)
}
}
if len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 {
return true
}
return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0
}
if pwc != nil {
if np != nil && nq != nil {
@@ -1726,3 +1726,63 @@ func getAllNodes(l *level, results *SublistResult) {
getAllNodes(n.next, results)
}
}
// IntersectStree will match all items in the given subject tree that
// have interest expressed in the given sublist. The callback will only be called
// once for each subject, regardless of overlapping subscriptions in the sublist.
func IntersectStree[T any](st *stree.SubjectTree[T], sl *Sublist, cb func(subj []byte, entry *T)) {
var _subj [255]byte
intersectStree(st, sl.root, _subj[:0], cb)
}
func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb func(subj []byte, entry *T)) {
nsubj := subj
if len(nsubj) > 0 {
nsubj = append(subj, '.')
}
if r.fwc != nil {
// We've reached a full wildcard, do a FWC match on the stree at this point
// and don't keep iterating downward.
nsubj := append(nsubj, '>')
st.Match(nsubj, cb)
return
}
if r.pwc != nil {
// We've found a partial wildcard. We'll keep iterating downwards, but first
// check whether there's interest at this level (without triggering dupes) and
// match if so.
var done bool
nsubj := append(nsubj, '*')
if len(r.pwc.psubs)+len(r.pwc.qsubs) > 0 {
st.Match(nsubj, cb)
done = true
}
if r.pwc.next.numNodes() > 0 {
intersectStree(st, r.pwc.next, nsubj, cb)
}
if done {
return
}
}
// Normal node with subject literals, keep iterating.
for t, n := range r.nodes {
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
// A wildcard at the next level will already visit these descendents
// so skip so we don't callback the same subject more than once.
continue
}
nsubj := append(nsubj, t...)
if len(n.psubs)+len(n.qsubs) > 0 {
if subjectHasWildcard(bytesToString(nsubj)) {
st.Match(nsubj, cb)
} else {
if e, ok := st.Find(nsubj); ok {
cb(nsubj, e)
}
}
}
if n.next.numNodes() > 0 {
intersectStree(st, n.next, nsubj, cb)
}
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2025 The NATS Authors
// Copyright 2012-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -21,6 +21,7 @@ package store
import (
"context"
"crypto/tls"
"fmt"
"strings"
"time"
@@ -32,6 +33,7 @@ import (
"github.com/opencloud-eu/reva/v2/pkg/store/etcd"
"github.com/opencloud-eu/reva/v2/pkg/store/memory"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
microstore "go-micro.dev/v4/store"
)
@@ -125,19 +127,33 @@ func Create(opts ...microstore.Option) microstore.Store {
return *ocMemStore
case TypeNatsJS:
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
return natsjs.NewStore(
store := natsjs.NewStore(
append(opts,
natsjs.NatsOptions(natsOptions), // always pass in properly initialized default nats options
natsjs.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
)
err := updateNatsStore(opts, ttl, natsOptions)
if err != nil {
options.Logger.Logf(logger.ErrorLevel, "failed to update nats-js store: '%s'", err.Error())
}
return store
case TypeNatsJSKV:
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
return natsjskv.NewStore(
store := natsjskv.NewStore(
append(opts,
natsjskv.NatsOptions(natsOptions), // always pass in properly initialized default nats options
natsjskv.EncodeKeys(), // nats has restrictions on the key, we cannot use slashes
natsjskv.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
)
err := updateNatsStore(opts, ttl, natsOptions)
if err != nil {
options.Logger.Logf(logger.ErrorLevel, "failed to update nats-js-kv store: '%s'", err.Error())
}
return store
case TypeMemory, "mem", "": // allow existing short form and use as default
return microstore.NewMemoryStore(opts...)
default:
@@ -146,13 +162,58 @@ func Create(opts ...microstore.Option) microstore.Store {
}
}
func updateNatsStore(opts []store.Option, ttl time.Duration, natsOptions nats.Options) error {
options := store.Options{}
for _, o := range opts {
o(&options)
}
bucketName := options.Database
if bucketName == "" {
return fmt.Errorf("bucket name (database) must be set")
}
if len(options.Nodes) > 0 {
natsOptions.Servers = options.Nodes
}
nc, err := natsOptions.Connect()
if err != nil {
return fmt.Errorf("could not connect to nats: %w", err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
return err
}
// NATS KV buckets are actually streams named "KV_<bucket_name>"
info, err := js.StreamInfo("KV_" + bucketName)
if err != nil {
return fmt.Errorf("failed to get bucket info: %w", err)
}
config := info.Config
config.MaxAge = ttl
_, err = js.UpdateStream(&config)
if err != nil {
return fmt.Errorf("failed to update bucket TTL: %w", err)
}
return nil
}
func natsConfig(log logger.Logger, ctx context.Context, opts []microstore.Option) ([]microstore.Option, time.Duration, nats.Options) {
if mem, _ := ctx.Value(disablePersistanceContextKey{}).(bool); mem {
opts = append(opts, natsjs.DefaultMemory())
}
ttl, _ := ctx.Value(ttlContextKey{}).(time.Duration)
ttl := time.Duration(0)
if d, ok := ctx.Value(ttlContextKey{}).(time.Duration); ok {
ttl = d
}
// preparing natsOptions before the switch to reuse the same code
natsOptions := nats.GetDefaultOptions()

8
vendor/modules.txt vendored
View File

@@ -746,7 +746,7 @@ github.com/google/go-querystring/query
# github.com/google/go-tika v0.3.1
## explicit; go 1.11
github.com/google/go-tika/tika
# github.com/google/go-tpm v0.9.8
# github.com/google/go-tpm v0.9.7
## explicit; go 1.22
github.com/google/go-tpm/legacy/tpm2
github.com/google/go-tpm/tpmutil
@@ -873,7 +873,7 @@ github.com/justinas/alice
# github.com/kevinburke/ssh_config v1.2.0
## explicit
github.com/kevinburke/ssh_config
# github.com/klauspost/compress v1.18.3
# github.com/klauspost/compress v1.18.2
## explicit; go 1.23
github.com/klauspost/compress
github.com/klauspost/compress/flate
@@ -1155,7 +1155,7 @@ github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.8.0
## explicit; go 1.23.0
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.12.4
# github.com/nats-io/nats-server/v2 v2.12.3
## explicit; go 1.24.0
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand
@@ -1376,7 +1376,7 @@ github.com/opencloud-eu/icap-client
# github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
## explicit; go 1.18
github.com/opencloud-eu/libre-graph-api-go
# github.com/opencloud-eu/reva/v2 v2.42.0
# github.com/opencloud-eu/reva/v2 v2.42.1
## explicit; go 1.24.1
github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace
github.com/opencloud-eu/reva/v2/cmd/revad/runtime