mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-29 08:21:23 -05:00
Compare commits
10 Commits
dependabot
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
663fe3740b | ||
|
|
4a83b34b91 | ||
|
|
8b4c218a5f | ||
|
|
70623d6007 | ||
|
|
cad6a61120 | ||
|
|
754f0fa5b7 | ||
|
|
38cf037a11 | ||
|
|
9eac47dab4 | ||
|
|
e7c14d2ee4 | ||
|
|
cd408c6923 |
@@ -523,6 +523,7 @@ def main(ctx):
|
||||
testPipelines(ctx)
|
||||
|
||||
build_release_pipelines = \
|
||||
checkVersionPlaceholder() + \
|
||||
dockerReleases(ctx) + \
|
||||
binaryReleases(ctx)
|
||||
|
||||
@@ -1817,6 +1818,27 @@ def dockerReleases(ctx):
|
||||
|
||||
return pipelines
|
||||
|
||||
def checkVersionPlaceholder():
|
||||
return [{
|
||||
"name": "check-version-placeholder",
|
||||
"steps": [
|
||||
{
|
||||
"name": "check-version-placeholder",
|
||||
"image": OC_CI_ALPINE,
|
||||
"commands": [
|
||||
"grep -r -e '%%NEXT%%' -e '%%NEXT_PRODUCTION_VERSION%%' %s/services %s/pkg > next_version.txt" % (
|
||||
dirs["base"],
|
||||
dirs["base"],
|
||||
),
|
||||
'if [ -s next_version.txt ]; then echo "replace version placeholders"; cat next_version.txt; exit 1; fi',
|
||||
],
|
||||
},
|
||||
],
|
||||
"when": [
|
||||
event["tag"],
|
||||
],
|
||||
}]
|
||||
|
||||
def dockerRelease(ctx, repo, build_type):
|
||||
build_args = {
|
||||
"REVISION": "%s" % ctx.build.commit,
|
||||
@@ -2230,7 +2252,7 @@ def genDocsPr(ctx):
|
||||
"MY_TARGET_BRANCH": "${CI_COMMIT_BRANCH##stable-}",
|
||||
},
|
||||
"commands": [
|
||||
'export DOC_GIT_TARGET_FOLDER="$$(if [ \"$$MY_TARGET_BRANCH\" = \"main\" ]; then echo \"tmpdocs/docs/dev/_static/env-vars/\"; else echo \"tmpdocs/versioned_docs/version-$${MY_TARGET_BRANCH}/dev/_static/env-vars/\"; fi)"',
|
||||
'export DOC_GIT_TARGET_FOLDER="$$(if [ \"$$MY_TARGET_BRANCH\" = \"main\" ]; then echo \"tmpdocs/docs/_static/env-vars/\"; else echo \"tmpdocs/versioned_docs/version-$${MY_TARGET_BRANCH}/_static/env-vars/\"; fi)"',
|
||||
'echo "$${CI_SSH_KEY}" > /root/id_rsa && chmod 600 /root/id_rsa',
|
||||
'git config --global user.email "devops@opencloud.eu"',
|
||||
'git config --global user.name "openclouders"',
|
||||
@@ -2255,8 +2277,8 @@ def genDocsPr(ctx):
|
||||
},
|
||||
{
|
||||
"event": "cron",
|
||||
"branch": "[main]",
|
||||
"cron": "nightly *",
|
||||
"branch": "main",
|
||||
"cron": "nightly*",
|
||||
},
|
||||
],
|
||||
}]
|
||||
|
||||
18
CHANGELOG.md
18
CHANGELOG.md
@@ -1,5 +1,23 @@
|
||||
# Changelog
|
||||
|
||||
## [5.0.1](https://github.com/opencloud-eu/opencloud/releases/tag/v5.0.1) - 2026-01-28
|
||||
|
||||
### ❤️ Thanks to all contributors! ❤️
|
||||
|
||||
@ScharfViktor, @aduffeck, @saw-jan
|
||||
|
||||
### 🐛 Bug Fixes
|
||||
|
||||
- Do not ever set a TTL for the ID cache. It's not supposed to expire. [[#2223](https://github.com/opencloud-eu/opencloud/pull/2223)]
|
||||
|
||||
### ✅ Tests
|
||||
|
||||
- test(api): wait for web-office readiness by checking discovery endpoint [[#2217](https://github.com/opencloud-eu/opencloud/pull/2217)]
|
||||
|
||||
### 📦️ Dependencies
|
||||
|
||||
- reva-bump-2.42.1 [[#2225](https://github.com/opencloud-eu/opencloud/pull/2225)]
|
||||
|
||||
## [5.0.0](https://github.com/opencloud-eu/opencloud/releases/tag/v5.0.0) - 2026-01-26
|
||||
|
||||
### ❤️ Thanks to all contributors! ❤️
|
||||
|
||||
8
go.mod
8
go.mod
@@ -55,7 +55,7 @@ require (
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/mna/pigeon v1.3.0
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
|
||||
github.com/nats-io/nats-server/v2 v2.12.4
|
||||
github.com/nats-io/nats-server/v2 v2.12.3
|
||||
github.com/nats-io/nats.go v1.48.0
|
||||
github.com/oklog/run v1.2.0
|
||||
github.com/olekukonko/tablewriter v1.1.3
|
||||
@@ -65,7 +65,7 @@ require (
|
||||
github.com/open-policy-agent/opa v1.12.3
|
||||
github.com/opencloud-eu/icap-client v0.0.0-20250930132611-28a2afe62d89
|
||||
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
|
||||
github.com/opencloud-eu/reva/v2 v2.42.0
|
||||
github.com/opencloud-eu/reva/v2 v2.42.1
|
||||
github.com/opensearch-project/opensearch-go/v4 v4.6.0
|
||||
github.com/orcaman/concurrent-map v1.0.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
@@ -241,7 +241,7 @@ require (
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/gomodule/redigo v1.9.3 // indirect
|
||||
github.com/google/go-querystring v1.1.0 // indirect
|
||||
github.com/google/go-tpm v0.9.8 // indirect
|
||||
github.com/google/go-tpm v0.9.7 // indirect
|
||||
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
|
||||
github.com/google/renameio/v2 v2.0.1 // indirect
|
||||
github.com/gookit/goutil v0.7.1 // indirect
|
||||
@@ -261,7 +261,7 @@ require (
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/juliangruber/go-intersect v1.1.0 // indirect
|
||||
github.com/kevinburke/ssh_config v1.2.0 // indirect
|
||||
github.com/klauspost/compress v1.18.3 // indirect
|
||||
github.com/klauspost/compress v1.18.2 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
|
||||
github.com/klauspost/crc32 v1.3.0 // indirect
|
||||
github.com/kovidgoyal/go-parallel v1.1.1 // indirect
|
||||
|
||||
16
go.sum
16
go.sum
@@ -575,8 +575,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD
|
||||
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
|
||||
github.com/google/go-tika v0.3.1 h1:l+jr10hDhZjcgxFRfcQChRLo1bPXQeLFluMyvDhXTTA=
|
||||
github.com/google/go-tika v0.3.1/go.mod h1:DJh5N8qxXIl85QkqmXknd+PeeRkUOTbvwyYf7ieDz6c=
|
||||
github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo=
|
||||
github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
|
||||
github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA=
|
||||
github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
|
||||
@@ -730,8 +730,8 @@ github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF
|
||||
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
|
||||
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
|
||||
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
|
||||
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||
@@ -916,8 +916,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
|
||||
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
|
||||
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
|
||||
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
|
||||
github.com/nats-io/nats-server/v2 v2.12.4 h1:ZnT10v2LU2Xcoiy8ek9X6Se4YG8EuMfIfvAEuFVx1Ts=
|
||||
github.com/nats-io/nats-server/v2 v2.12.4/go.mod h1:5MCp/pqm5SEfsvVZ31ll1088ZTwEUdvRX1Hmh/mTTDg=
|
||||
github.com/nats-io/nats-server/v2 v2.12.3 h1:KRv+1n7lddMVgkJPQer+pt36TcO0ENxjilBmeWdjcHs=
|
||||
github.com/nats-io/nats-server/v2 v2.12.3/go.mod h1:MQXjG9WjyXKz9koWzUc3jYUMKD8x3CLmTNy91IQQz3Y=
|
||||
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
|
||||
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
|
||||
@@ -969,8 +969,8 @@ github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9 h1:dIft
|
||||
github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9/go.mod h1:JWyDC6H+5oZRdUJUgKuaye+8Ph5hEs6HVzVoPKzWSGI=
|
||||
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76 h1:vD/EdfDUrv4omSFjrinT8Mvf+8D7f9g4vgQ2oiDrVUI=
|
||||
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q=
|
||||
github.com/opencloud-eu/reva/v2 v2.42.0 h1:CWlXbNqUSduQ5Afi6XoegoJ/zyV0Vx5UoPKAZZmEAq4=
|
||||
github.com/opencloud-eu/reva/v2 v2.42.0/go.mod h1:pv+w23JG0/qJweZbTzNNev//YEvlUML1L/2iXgKGkkg=
|
||||
github.com/opencloud-eu/reva/v2 v2.42.1 h1:QUZOLSfAhb7bw+qsVSFMFY644rUz4/NtnOiJ0QQxj2o=
|
||||
github.com/opencloud-eu/reva/v2 v2.42.1/go.mod h1:pv+w23JG0/qJweZbTzNNev//YEvlUML1L/2iXgKGkkg=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
|
||||
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
|
||||
|
||||
@@ -34,7 +34,7 @@ var (
|
||||
// LatestTag is the latest released version plus the dev meta version.
|
||||
// Will be overwritten by the release pipeline
|
||||
// Needs a manual change for every tagged release
|
||||
LatestTag = "5.0.0+dev"
|
||||
LatestTag = "5.0.1+dev"
|
||||
|
||||
// Date indicates the build date.
|
||||
// This has been removed, it looks like you can only replace static strings with recent go versions
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Ivan Fustero, 2025\n"
|
||||
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Jörn Friedrich Dreyer <jfd@butonic.de>, 2025\n"
|
||||
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Elías Martín, 2025\n"
|
||||
"Language-Team: Spanish (https://app.transifex.com/opencloud-eu/teams/204053/es/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
|
||||
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Simone Broglia, 2025\n"
|
||||
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"
|
||||
|
||||
@@ -12,7 +12,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Junghyuk Kwon <kwon@junghy.uk>, 2025\n"
|
||||
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: YQS Yang, 2025\n"
|
||||
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Ivan Fustero, 2025\n"
|
||||
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Jörn Friedrich Dreyer <jfd@butonic.de>, 2025\n"
|
||||
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Elías Martín, 2025\n"
|
||||
"Language-Team: Spanish (https://app.transifex.com/opencloud-eu/teams/204053/es/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
|
||||
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Simone Broglia, 2025\n"
|
||||
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: gapho shin, 2025\n"
|
||||
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: YQS Yang, 2025\n"
|
||||
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Ivan Fustero, 2025\n"
|
||||
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"
|
||||
|
||||
@@ -12,7 +12,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Jonas, 2025\n"
|
||||
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
|
||||
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Simone Broglia, 2025\n"
|
||||
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: gapho shin, 2025\n"
|
||||
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: YQS Yang, 2025\n"
|
||||
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Ivan Fustero, 2025\n"
|
||||
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Jörn Friedrich Dreyer <jfd@butonic.de>, 2025\n"
|
||||
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"
|
||||
|
||||
@@ -12,7 +12,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Alejandro Robles, 2025\n"
|
||||
"Language-Team: Spanish (https://app.transifex.com/opencloud-eu/teams/204053/es/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
|
||||
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Simone Pagano, 2025\n"
|
||||
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: gapho shin, 2025\n"
|
||||
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"
|
||||
|
||||
@@ -12,7 +12,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Lulufox, 2025\n"
|
||||
"Language-Team: Russian (https://app.transifex.com/opencloud-eu/teams/204053/ru/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Davis Kaza, 2025\n"
|
||||
"Language-Team: Swedish (https://app.transifex.com/opencloud-eu/teams/204053/sv/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: YQS Yang, 2025\n"
|
||||
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Ivan Fustero, 2025\n"
|
||||
"Language-Team: Catalan (https://app.transifex.com/opencloud-eu/teams/204053/ca/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Jörn Friedrich Dreyer <jfd@butonic.de>, 2025\n"
|
||||
"Language-Team: German (https://app.transifex.com/opencloud-eu/teams/204053/de/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Elías Martín, 2025\n"
|
||||
"Language-Team: Spanish (https://app.transifex.com/opencloud-eu/teams/204053/es/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: eric_G <junk.eg@free.fr>, 2025\n"
|
||||
"Language-Team: French (https://app.transifex.com/opencloud-eu/teams/204053/fr/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Simone Broglia, 2025\n"
|
||||
"Language-Team: Italian (https://app.transifex.com/opencloud-eu/teams/204053/it/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: gapho shin, 2025\n"
|
||||
"Language-Team: Korean (https://app.transifex.com/opencloud-eu/teams/204053/ko/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: Lulufox, 2025\n"
|
||||
"Language-Team: Russian (https://app.transifex.com/opencloud-eu/teams/204053/ru/)\n"
|
||||
|
||||
@@ -11,7 +11,7 @@ msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: \n"
|
||||
"Report-Msgid-Bugs-To: EMAIL\n"
|
||||
"POT-Creation-Date: 2026-01-09 00:08+0000\n"
|
||||
"POT-Creation-Date: 2026-01-29 00:12+0000\n"
|
||||
"PO-Revision-Date: 2025-01-27 10:17+0000\n"
|
||||
"Last-Translator: YQS Yang, 2025\n"
|
||||
"Language-Team: Chinese (https://app.transifex.com/opencloud-eu/teams/204053/zh/)\n"
|
||||
|
||||
@@ -279,14 +279,16 @@ function run_behat_tests() {
|
||||
fi
|
||||
fi
|
||||
|
||||
FAILED_SCENARIO_PATHS_COLORED=`awk '/Failed scenarios:/',0 ${TEST_LOG_FILE} | grep -a feature`
|
||||
FAILED_SCENARIO_PATHS_COLORED=`awk '/Failed (scenarios|hooks):/',0 ${TEST_LOG_FILE} | grep -a feature`
|
||||
# There will be some ANSI escape codes for color in the FEATURE_COLORED var.
|
||||
# Strip them out so we can pass just the ordinary feature details to Behat.
|
||||
# Also strip everything after ".feature:XX", including text such as "(on line xx)" added by Behat indicating the failing step's line number.
|
||||
# Thanks to https://en.wikipedia.org/wiki/Tee_(command) and
|
||||
# https://stackoverflow.com/questions/23416278/how-to-strip-ansi-escape-sequences-from-a-variable
|
||||
# for ideas.
|
||||
FAILED_SCENARIO_PATHS=$(echo "${FAILED_SCENARIO_PATHS_COLORED}" | sed "s/\x1b[^m]*m//g" | sed 's/\(\.feature:[0-9]\+\).*/\1/')
|
||||
FAILED_SCENARIO_PATHS=$(echo "${FAILED_SCENARIO_PATHS_COLORED}" | sed "s/\x1b[^m]*m//g" | sed "s/AfterScenario \"//g" | sed 's/\(\.feature:[0-9]\+\).*/\1/')
|
||||
# remove duplicate scenario paths
|
||||
FAILED_SCENARIO_PATHS=$(echo "$FAILED_SCENARIO_PATHS" | awk '!seen[$0]++')
|
||||
|
||||
# If something else went wrong, and there were no failed scenarios,
|
||||
# then the awk, grep, sed command sequence above ends up with an empty string.
|
||||
|
||||
61
vendor/github.com/nats-io/nats-server/v2/conf/parse.go
generated
vendored
61
vendor/github.com/nats-io/nats-server/v2/conf/parse.go
generated
vendored
@@ -60,9 +60,6 @@ type parser struct {
|
||||
|
||||
// pedantic reports error when configuration is not correct.
|
||||
pedantic bool
|
||||
|
||||
// Tracks environment variable references, to avoid cycles
|
||||
envVarReferences map[string]bool
|
||||
}
|
||||
|
||||
// Parse will return a map of keys to any, although concrete types
|
||||
@@ -183,37 +180,16 @@ func (t *token) Position() int {
|
||||
return t.item.pos
|
||||
}
|
||||
|
||||
func newParser(data, fp string, pedantic bool) *parser {
|
||||
return &parser{
|
||||
mapping: make(map[string]any),
|
||||
lx: lex(data),
|
||||
ctxs: make([]any, 0, 4),
|
||||
keys: make([]string, 0, 4),
|
||||
ikeys: make([]item, 0, 4),
|
||||
fp: filepath.Dir(fp),
|
||||
pedantic: pedantic,
|
||||
envVarReferences: make(map[string]bool),
|
||||
func parse(data, fp string, pedantic bool) (p *parser, err error) {
|
||||
p = &parser{
|
||||
mapping: make(map[string]any),
|
||||
lx: lex(data),
|
||||
ctxs: make([]any, 0, 4),
|
||||
keys: make([]string, 0, 4),
|
||||
ikeys: make([]item, 0, 4),
|
||||
fp: filepath.Dir(fp),
|
||||
pedantic: pedantic,
|
||||
}
|
||||
}
|
||||
|
||||
func parse(data, fp string, pedantic bool) (*parser, error) {
|
||||
p := newParser(data, fp, pedantic)
|
||||
if err := p.parse(fp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func parseEnv(data string, parent *parser) (*parser, error) {
|
||||
p := newParser(data, "", false)
|
||||
p.envVarReferences = parent.envVarReferences
|
||||
if err := p.parse(""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *parser) parse(fp string) error {
|
||||
p.pushContext(p.mapping)
|
||||
|
||||
var prevItem item
|
||||
@@ -223,16 +199,16 @@ func (p *parser) parse(fp string) error {
|
||||
// Here we allow the final character to be a bracket '}'
|
||||
// in order to support JSON like configurations.
|
||||
if prevItem.typ == itemKey && prevItem.val != mapEndString {
|
||||
return fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos)
|
||||
return nil, fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos)
|
||||
}
|
||||
break
|
||||
}
|
||||
prevItem = it
|
||||
if err := p.processItem(it, fp); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *parser) next() item {
|
||||
@@ -477,18 +453,11 @@ func (p *parser) lookupVariable(varReference string) (any, bool, error) {
|
||||
}
|
||||
|
||||
// If we are here, we have exhausted our context maps and still not found anything.
|
||||
// Detect reference cycles
|
||||
if p.envVarReferences[varReference] {
|
||||
return nil, false, fmt.Errorf("variable reference cycle for '%s'", varReference)
|
||||
}
|
||||
p.envVarReferences[varReference] = true
|
||||
defer delete(p.envVarReferences, varReference)
|
||||
|
||||
// Parse from the environment
|
||||
// Parse from the environment.
|
||||
if vStr, ok := os.LookupEnv(varReference); ok {
|
||||
// Everything we get here will be a string value, so we need to process as a parser would.
|
||||
if subp, err := parseEnv(fmt.Sprintf("%s=%s", pkey, vStr), p); err == nil {
|
||||
v, ok := subp.mapping[pkey]
|
||||
if vmap, err := Parse(fmt.Sprintf("%s=%s", pkey, vStr)); err == nil {
|
||||
v, ok := vmap[pkey]
|
||||
return v, ok, nil
|
||||
} else {
|
||||
return nil, false, err
|
||||
|
||||
1
vendor/github.com/nats-io/nats-server/v2/server/accounts.go
generated
vendored
1
vendor/github.com/nats-io/nats-server/v2/server/accounts.go
generated
vendored
@@ -299,7 +299,6 @@ func (a *Account) shallowCopy(na *Account) {
|
||||
na.Nkey = a.Nkey
|
||||
na.Issuer = a.Issuer
|
||||
na.traceDest, na.traceDestSampling = a.traceDest, a.traceDestSampling
|
||||
na.nrgAccount = a.nrgAccount
|
||||
|
||||
if a.imports.streams != nil {
|
||||
na.imports.streams = make([]*streamImport, 0, len(a.imports.streams))
|
||||
|
||||
2
vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go
generated
vendored
2
vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2023-2025 The NATS Authors
|
||||
// Copyright 2023-2024 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
||||
63
vendor/github.com/nats-io/nats-server/v2/server/client.go
generated
vendored
63
vendor/github.com/nats-io/nats-server/v2/server/client.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2026 The NATS Authors
|
||||
// Copyright 2012-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -36,6 +35,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"slices"
|
||||
|
||||
"github.com/klauspost/compress/s2"
|
||||
"github.com/nats-io/jwt/v2"
|
||||
"github.com/nats-io/nats-server/v2/internal/fastrand"
|
||||
@@ -2731,12 +2732,9 @@ func (c *client) updateS2AutoCompressionLevel(co *CompressionOpts, compression *
|
||||
}
|
||||
|
||||
// Will return the parts from the raw wire msg.
|
||||
// We return the `hdr` as a slice that is capped to the length of the headers
|
||||
// so that if the caller later tries to append to the returned header slice it
|
||||
// does not affect the message content.
|
||||
func (c *client) msgParts(data []byte) (hdr []byte, msg []byte) {
|
||||
if c != nil && c.pa.hdr > 0 {
|
||||
return data[:c.pa.hdr:c.pa.hdr], data[c.pa.hdr:]
|
||||
return data[:c.pa.hdr], data[c.pa.hdr:]
|
||||
}
|
||||
return nil, data
|
||||
}
|
||||
@@ -3339,7 +3337,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
|
||||
sub.shadow = nil
|
||||
if len(shadowSubs) > 0 {
|
||||
isSpokeLeaf = c.isSpokeLeafNode()
|
||||
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF || c.kind == JETSTREAM) && c.srv != nil
|
||||
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil
|
||||
}
|
||||
sub.close()
|
||||
c.mu.Unlock()
|
||||
@@ -4567,19 +4565,6 @@ func getHeaderKeyIndex(key string, hdr []byte) int {
|
||||
}
|
||||
}
|
||||
|
||||
// setHeader will replace the value of the first existing key `key`
|
||||
// with the given value `val`, or add this new key at the end of
|
||||
// the headers.
|
||||
//
|
||||
// Note: If the key does not exist, or if it exists but the new value
|
||||
// would make the resulting byte slice larger than the original one,
|
||||
// a new byte slice is returned and the original is left untouched.
|
||||
// This is to prevent situations where caller may have a `hdr` and
|
||||
// `msg` that are the parts of an underlying buffer. Extending the
|
||||
// `hdr` would otherwise overwrite the `msg` part.
|
||||
//
|
||||
// If the new value is smaller, then the original `hdr` byte slice
|
||||
// is modified.
|
||||
func setHeader(key, val string, hdr []byte) []byte {
|
||||
start := getHeaderKeyIndex(key, hdr)
|
||||
if start >= 0 {
|
||||
@@ -4594,45 +4579,15 @@ func setHeader(key, val string, hdr []byte) []byte {
|
||||
return hdr // malformed headers
|
||||
}
|
||||
valEnd += valStart
|
||||
// Length of the existing value (before the `\r`)
|
||||
oldValLen := valEnd - valStart
|
||||
// This is how many extra bytes we need for the new value.
|
||||
// If <= 0, it means that we need less and so will reuse the `hdr` buffer.
|
||||
if extra := len(val) - oldValLen; extra > 0 {
|
||||
// Check that we don't overflow an "int".
|
||||
if rem := math.MaxInt - hdrLen; rem < extra {
|
||||
// We don't grow, and return the existing header.
|
||||
return hdr
|
||||
}
|
||||
// The new size is the old size plus the extra bytes.
|
||||
newHdrSize := hdrLen + extra
|
||||
newHdr := make([]byte, newHdrSize)
|
||||
// Copy the parts from `hdr` and `val` into the new buffer.
|
||||
n := copy(newHdr, hdr[:valStart])
|
||||
n += copy(newHdr[n:], val)
|
||||
copy(newHdr[n:], hdr[valEnd:])
|
||||
return newHdr
|
||||
}
|
||||
// We can write in place since it fits in the existing `hdr` buffer.
|
||||
n := copy(hdr[valStart:], val)
|
||||
n += copy(hdr[valStart+n:], hdr[valEnd:])
|
||||
hdr = hdr[:valStart+n]
|
||||
return hdr
|
||||
suffix := slices.Clone(hdr[valEnd:])
|
||||
newHdr := append(hdr[:valStart], val...)
|
||||
return append(newHdr, suffix...)
|
||||
}
|
||||
if len(hdr) > 0 && bytes.HasSuffix(hdr, []byte("\r\n")) {
|
||||
hdr = hdr[:len(hdr)-2]
|
||||
val += "\r\n"
|
||||
}
|
||||
// Create the new buffer based on length of existing one and
|
||||
// length of the new "<key>: <value>\r\n". Protect against "int" overflow.
|
||||
newSize := uint64(len(hdr)) + uint64(len(key)) + 1 + 1 + uint64(len(val)) + 2
|
||||
if newSize > uint64(math.MaxInt) {
|
||||
// We don't grow, and return the existing header.
|
||||
return hdr
|
||||
}
|
||||
newHdr := make([]byte, 0, int(newSize))
|
||||
newHdr = append(newHdr, hdr...)
|
||||
return fmt.Appendf(newHdr, "%s: %s\r\n", key, val)
|
||||
return fmt.Appendf(hdr, "%s: %s\r\n", key, val)
|
||||
}
|
||||
|
||||
// For bytes.HasPrefix below.
|
||||
|
||||
2
vendor/github.com/nats-io/nats-server/v2/server/const.go
generated
vendored
2
vendor/github.com/nats-io/nats-server/v2/server/const.go
generated
vendored
@@ -66,7 +66,7 @@ func init() {
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.12.4"
|
||||
VERSION = "2.12.3"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
76
vendor/github.com/nats-io/nats-server/v2/server/consumer.go
generated
vendored
76
vendor/github.com/nats-io/nats-server/v2/server/consumer.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2026 The NATS Authors
|
||||
// Copyright 2019-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -1030,11 +1030,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
}
|
||||
if cName != _EMPTY_ {
|
||||
if eo, ok := mset.consumers[cName]; ok {
|
||||
mset.mu.Unlock()
|
||||
if action == ActionCreate {
|
||||
ocfg := eo.config()
|
||||
copyConsumerMetadata(config, &ocfg)
|
||||
if !reflect.DeepEqual(config, &ocfg) {
|
||||
mset.mu.Unlock()
|
||||
return nil, NewJSConsumerAlreadyExistsError()
|
||||
}
|
||||
}
|
||||
@@ -1042,11 +1042,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
if cfg.Retention == WorkQueuePolicy {
|
||||
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
|
||||
if !mset.partitionUnique(cName, subjects) {
|
||||
mset.mu.Unlock()
|
||||
return nil, NewJSConsumerWQConsumerNotUniqueError()
|
||||
}
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
err := eo.updateConfig(config)
|
||||
if err == nil {
|
||||
return eo, nil
|
||||
@@ -1544,6 +1542,7 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
if o.cfg.AckPolicy != AckNone {
|
||||
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
|
||||
o.mu.Unlock()
|
||||
o.deleteWithoutAdvisory()
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -1552,6 +1551,7 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
// Will error if wrong mode to provide feedback to users.
|
||||
if o.reqSub, err = o.subscribeInternal(o.nextMsgSubj, o.processNextMsgReq); err != nil {
|
||||
o.mu.Unlock()
|
||||
o.deleteWithoutAdvisory()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1561,6 +1561,7 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
fcsubj := fmt.Sprintf(jsFlowControl, stream, o.name)
|
||||
if o.fcSub, err = o.subscribeInternal(fcsubj, o.processFlowControl); err != nil {
|
||||
o.mu.Unlock()
|
||||
o.deleteWithoutAdvisory()
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -2400,8 +2401,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
|
||||
|
||||
// Check for Subject Filters update.
|
||||
newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects)
|
||||
updatedFilters := !subjectSliceEqual(newSubjects, o.subjf.subjects())
|
||||
if updatedFilters {
|
||||
if !subjectSliceEqual(newSubjects, o.subjf.subjects()) {
|
||||
newSubjf := make(subjectFilters, 0, len(newSubjects))
|
||||
for _, newFilter := range newSubjects {
|
||||
fs := &subjectFilter{
|
||||
@@ -2440,18 +2440,16 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
|
||||
// Allowed but considered no-op, [Description, SampleFrequency, MaxWaiting, HeadersOnly]
|
||||
o.cfg = *cfg
|
||||
|
||||
if updatedFilters {
|
||||
// Cleanup messages that lost interest.
|
||||
if o.retention == InterestPolicy {
|
||||
o.mu.Unlock()
|
||||
o.cleanupNoInterestMessages(o.mset, false)
|
||||
o.mu.Lock()
|
||||
}
|
||||
|
||||
// Re-calculate num pending on update.
|
||||
o.streamNumPending()
|
||||
// Cleanup messages that lost interest.
|
||||
if o.retention == InterestPolicy {
|
||||
o.mu.Unlock()
|
||||
o.cleanupNoInterestMessages(o.mset, false)
|
||||
o.mu.Lock()
|
||||
}
|
||||
|
||||
// Re-calculate num pending on update.
|
||||
o.streamNumPending()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -5117,14 +5115,9 @@ func (o *consumer) checkNumPending() (uint64, error) {
|
||||
var state StreamState
|
||||
o.mset.store.FastState(&state)
|
||||
npc := o.numPending()
|
||||
// Make sure we can't report more messages than there are.
|
||||
// TODO(nat): It's not great that this means consumer info has side effects,
|
||||
// since we can't know whether anyone will call it or not. The previous num
|
||||
// pending calculation that this replaces had the same problem though.
|
||||
if o.sseq > state.LastSeq {
|
||||
o.npc = 0
|
||||
} else if npc > 0 {
|
||||
o.npc = int64(min(npc, state.Msgs, state.LastSeq-o.sseq+1))
|
||||
if o.sseq > state.LastSeq && npc > 0 || npc > state.Msgs {
|
||||
// Re-calculate.
|
||||
return o.streamNumPending()
|
||||
}
|
||||
}
|
||||
return o.numPending(), nil
|
||||
@@ -5372,15 +5365,6 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
|
||||
o.pending = make(map[uint64]*Pending)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if p, ok := o.pending[sseq]; ok {
|
||||
// Update timestamp but keep original consumer delivery sequence.
|
||||
// So do not update p.Sequence.
|
||||
p.Timestamp = now.UnixNano()
|
||||
} else {
|
||||
o.pending[sseq] = &Pending{dseq, now.UnixNano()}
|
||||
}
|
||||
|
||||
// We could have a backoff that set a timer higher than what we need for this message.
|
||||
// In that case, reset to lowest backoff required for a message redelivery.
|
||||
minDelay := o.ackWait(0)
|
||||
@@ -5393,10 +5377,18 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
|
||||
}
|
||||
minDelay = o.ackWait(o.cfg.BackOff[bi])
|
||||
}
|
||||
minDeadline := now.Add(minDelay)
|
||||
minDeadline := time.Now().Add(minDelay)
|
||||
if o.ptmr == nil || o.ptmrEnd.After(minDeadline) {
|
||||
o.resetPtmr(minDelay)
|
||||
}
|
||||
|
||||
if p, ok := o.pending[sseq]; ok {
|
||||
// Update timestamp but keep original consumer delivery sequence.
|
||||
// So do not update p.Sequence.
|
||||
p.Timestamp = time.Now().UnixNano()
|
||||
} else {
|
||||
o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()}
|
||||
}
|
||||
}
|
||||
|
||||
// Credit back a failed delivery.
|
||||
@@ -6511,10 +6503,6 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
|
||||
retryAsflr = seq
|
||||
}
|
||||
} else if seq <= dflr {
|
||||
// Store the first entry above our ack floor, so we don't need to look it up again on retryAsflr=0.
|
||||
if retryAsflr == 0 {
|
||||
retryAsflr = seq
|
||||
}
|
||||
// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
|
||||
if _, ok := state.Pending[seq]; !ok {
|
||||
// The filters are already taken into account,
|
||||
@@ -6526,18 +6514,8 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
|
||||
}
|
||||
}
|
||||
// If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it.
|
||||
// However, our ack floor may be lower than the next message we can receive, so we correct it upward if needed.
|
||||
if retryAsflr == 0 {
|
||||
if filters != nil {
|
||||
_, nseq, err = store.LoadNextMsgMulti(filters, asflr+1, &smv)
|
||||
} else {
|
||||
_, nseq, err = store.LoadNextMsg(filter, wc, asflr+1, &smv)
|
||||
}
|
||||
if err == nil {
|
||||
retryAsflr = max(asflr+1, nseq)
|
||||
} else if err == ErrStoreEOF {
|
||||
retryAsflr = ss.LastSeq + 1
|
||||
}
|
||||
retryAsflr = asflr + 1
|
||||
}
|
||||
|
||||
o.mu.Lock()
|
||||
|
||||
371
vendor/github.com/nats-io/nats-server/v2/server/filestore.go
generated
vendored
371
vendor/github.com/nats-io/nats-server/v2/server/filestore.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2026 The NATS Authors
|
||||
// Copyright 2019-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -201,7 +201,7 @@ type fileStore struct {
|
||||
sips int
|
||||
dirty int
|
||||
closing bool
|
||||
closed atomic.Bool // Atomic to reduce contention on ConsumerStores.
|
||||
closed bool
|
||||
fip bool
|
||||
receivedAny bool
|
||||
firstMoved bool
|
||||
@@ -473,18 +473,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
|
||||
}
|
||||
|
||||
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
|
||||
_, err = os.Stat(keyFile)
|
||||
// Either the file should exist (err=nil), or it shouldn't. Any other error is reported.
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
// Make sure we do not have an encrypted store underneath of us but no main key.
|
||||
if fs.prf == nil && err == nil {
|
||||
return nil, errNoMainKey
|
||||
} else if fs.prf != nil && err == nil {
|
||||
// If encryption is configured and the key file exists, recover our keys.
|
||||
if err = fs.recoverAEK(); err != nil {
|
||||
return nil, err
|
||||
if fs.prf == nil {
|
||||
if _, err := os.Stat(keyFile); err == nil {
|
||||
return nil, errNoMainKey
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1792,14 +1784,16 @@ func (fs *fileStore) recoverFullState() (rerr error) {
|
||||
}
|
||||
|
||||
// Decrypt if needed.
|
||||
// We can be setup for encryption but if this is a snapshot restore we will be missing the keyfile
|
||||
// since snapshots strip encryption.
|
||||
if fs.prf != nil && fs.aek != nil {
|
||||
ns := fs.aek.NonceSize()
|
||||
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil)
|
||||
if err != nil {
|
||||
fs.warn("Stream state error reading encryption key: %v", err)
|
||||
return err
|
||||
if fs.prf != nil {
|
||||
// We can be setup for encryption but if this is a snapshot restore we will be missing the keyfile
|
||||
// since snapshots strip encryption.
|
||||
if err := fs.recoverAEK(); err == nil {
|
||||
ns := fs.aek.NonceSize()
|
||||
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil)
|
||||
if err != nil {
|
||||
fs.warn("Stream state error reading encryption key: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2352,7 +2346,7 @@ func (fs *fileStore) recoverMsgs() error {
|
||||
if fs.ld != nil {
|
||||
var emptyBlks []*msgBlock
|
||||
for _, mb := range fs.blks {
|
||||
if mb.msgs == 0 && mb.rbytes == 0 && mb != fs.lmb {
|
||||
if mb.msgs == 0 && mb.rbytes == 0 {
|
||||
emptyBlks = append(emptyBlks, mb)
|
||||
}
|
||||
}
|
||||
@@ -2594,7 +2588,7 @@ func copyMsgBlocks(src []*msgBlock) []*msgBlock {
|
||||
func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
|
||||
fs.mu.RLock()
|
||||
lastSeq := fs.state.LastSeq
|
||||
closed := fs.isClosed()
|
||||
closed := fs.closed
|
||||
fs.mu.RUnlock()
|
||||
|
||||
if closed {
|
||||
@@ -2609,51 +2603,20 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
|
||||
fseq := atomic.LoadUint64(&mb.first.seq)
|
||||
lseq := atomic.LoadUint64(&mb.last.seq)
|
||||
|
||||
var (
|
||||
smv StoreMsg
|
||||
cts int64
|
||||
cseq uint64
|
||||
off uint64
|
||||
)
|
||||
var smv StoreMsg
|
||||
ts := t.UnixNano()
|
||||
|
||||
// Using a binary search, but need to be aware of interior deletes in the block.
|
||||
seq := lseq + 1
|
||||
loop:
|
||||
for fseq <= lseq {
|
||||
mid := fseq + (lseq-fseq)/2
|
||||
off = 0
|
||||
// Potentially skip over gaps. We keep the original middle but keep track of a
|
||||
// potential delete range with an offset.
|
||||
for {
|
||||
sm, _, err := mb.fetchMsgNoCopy(mid+off, &smv)
|
||||
if err != nil || sm == nil {
|
||||
off++
|
||||
if mid+off <= lseq {
|
||||
continue
|
||||
} else {
|
||||
// Continue search to the left. Purposely ignore the skipped deletes here.
|
||||
lseq = mid - 1
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
cts = sm.ts
|
||||
cseq = sm.seq
|
||||
break
|
||||
}
|
||||
if cts >= ts {
|
||||
seq = cseq
|
||||
if mid == fseq {
|
||||
break
|
||||
}
|
||||
// Continue search to the left.
|
||||
lseq = mid - 1
|
||||
} else {
|
||||
// Continue search to the right (potentially skipping over interior deletes).
|
||||
fseq = mid + off + 1
|
||||
}
|
||||
// Because sort.Search expects range [0,off), we have to manually
|
||||
// calculate the offset from the first sequence.
|
||||
off := int(lseq - fseq + 1)
|
||||
i := sort.Search(off, func(i int) bool {
|
||||
sm, _, _ := mb.fetchMsgNoCopy(fseq+uint64(i), &smv)
|
||||
return sm != nil && sm.ts >= ts
|
||||
})
|
||||
if i < off {
|
||||
return fseq + uint64(i)
|
||||
}
|
||||
return seq
|
||||
return 0
|
||||
}
|
||||
|
||||
// Find the first matching message against a sublist.
|
||||
@@ -2669,15 +2632,13 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
|
||||
mb.mu.Unlock()
|
||||
}()
|
||||
|
||||
if mb.fssNotLoaded() {
|
||||
// Make sure we have fss loaded.
|
||||
// Need messages loaded from here on out.
|
||||
if mb.cacheNotLoaded() {
|
||||
if err := mb.loadMsgsWithLock(); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
didLoad = true
|
||||
}
|
||||
// Mark fss activity.
|
||||
mb.lsts = ats.AccessTime()
|
||||
|
||||
// Make sure to start at mb.first.seq if fseq < mb.first.seq
|
||||
if seq := atomic.LoadUint64(&mb.first.seq); seq > start {
|
||||
@@ -2685,6 +2646,10 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
|
||||
}
|
||||
lseq := atomic.LoadUint64(&mb.last.seq)
|
||||
|
||||
if sm == nil {
|
||||
sm = new(StoreMsg)
|
||||
}
|
||||
|
||||
// If the FSS state has fewer entries than sequences in the linear scan,
|
||||
// then use intersection instead as likely going to be cheaper. This will
|
||||
// often be the case with high numbers of deletes, as well as a smaller
|
||||
@@ -2692,11 +2657,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
|
||||
if uint64(mb.fss.Size()) < lseq-start {
|
||||
// If there are no subject matches then this is effectively no-op.
|
||||
hseq := uint64(math.MaxUint64)
|
||||
var ierr error
|
||||
stree.IntersectGSL(mb.fss, sl, func(subj []byte, ss *SimpleState) {
|
||||
if ierr != nil {
|
||||
return
|
||||
}
|
||||
gsl.IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) {
|
||||
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
|
||||
// mb is already loaded into the cache so should be fast-ish.
|
||||
mb.recalculateForSubj(bytesToString(subj), ss)
|
||||
@@ -2708,16 +2669,6 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
|
||||
// than our first seq for this subject.
|
||||
return
|
||||
}
|
||||
// Need messages loaded from here on out.
|
||||
if mb.cacheNotLoaded() {
|
||||
if ierr = mb.loadMsgsWithLock(); ierr != nil {
|
||||
return
|
||||
}
|
||||
didLoad = true
|
||||
}
|
||||
if sm == nil {
|
||||
sm = new(StoreMsg)
|
||||
}
|
||||
if first == ss.First {
|
||||
// If the start floor is below where this subject starts then we can
|
||||
// short-circuit, avoiding needing to scan for the next message.
|
||||
@@ -2752,24 +2703,10 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *
|
||||
mb.llseq = llseq
|
||||
}
|
||||
})
|
||||
if ierr != nil {
|
||||
return nil, false, ierr
|
||||
}
|
||||
if hseq < uint64(math.MaxUint64) && sm != nil {
|
||||
return sm, didLoad && start == lseq, nil
|
||||
}
|
||||
} else {
|
||||
// Need messages loaded from here on out.
|
||||
if mb.cacheNotLoaded() {
|
||||
if err := mb.loadMsgsWithLock(); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
didLoad = true
|
||||
}
|
||||
if sm == nil {
|
||||
sm = new(StoreMsg)
|
||||
}
|
||||
|
||||
for seq := start; seq <= lseq; seq++ {
|
||||
if mb.dmap.Exists(seq) {
|
||||
// Optimisation to avoid calling cacheLookup which hits time.Now().
|
||||
@@ -2836,10 +2773,6 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
|
||||
isAll = true
|
||||
})
|
||||
}
|
||||
// If the only subject in this block isn't our filter, can simply short-circuit.
|
||||
if !isAll {
|
||||
return nil, didLoad, ErrStoreMsgNotFound
|
||||
}
|
||||
}
|
||||
// Make sure to start at mb.first.seq if fseq < mb.first.seq
|
||||
fseq = max(fseq, atomic.LoadUint64(&mb.first.seq))
|
||||
@@ -2976,7 +2909,7 @@ func (mb *msgBlock) prevMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *S
|
||||
if uint64(mb.fss.Size()) < start-lseq {
|
||||
// If there are no subject matches then this is effectively no-op.
|
||||
hseq := uint64(0)
|
||||
stree.IntersectGSL(mb.fss, sl, func(subj []byte, ss *SimpleState) {
|
||||
gsl.IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) {
|
||||
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
|
||||
// mb is already loaded into the cache so should be fast-ish.
|
||||
mb.recalculateForSubj(bytesToString(subj), ss)
|
||||
@@ -3255,30 +3188,6 @@ func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool, bi int) (int, e
|
||||
if start == uint32(math.MaxUint32) {
|
||||
return -1, ErrStoreEOF
|
||||
}
|
||||
return fs.selectSkipFirstBlock(bi, start, stop)
|
||||
}
|
||||
|
||||
// This is used to see if we can selectively jump start blocks based on filter subjects and a starting block index.
|
||||
// Will return -1 and ErrStoreEOF if no matches at all or no more from where we are.
|
||||
func (fs *fileStore) checkSkipFirstBlockMulti(sl *gsl.SimpleSublist, bi int) (int, error) {
|
||||
// Move through psim to gather start and stop bounds.
|
||||
start, stop := uint32(math.MaxUint32), uint32(0)
|
||||
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
|
||||
if psi.fblk < start {
|
||||
start = psi.fblk
|
||||
}
|
||||
if psi.lblk > stop {
|
||||
stop = psi.lblk
|
||||
}
|
||||
})
|
||||
// Nothing was found.
|
||||
if start == uint32(math.MaxUint32) {
|
||||
return -1, ErrStoreEOF
|
||||
}
|
||||
return fs.selectSkipFirstBlock(bi, start, stop)
|
||||
}
|
||||
|
||||
func (fs *fileStore) selectSkipFirstBlock(bi int, start, stop uint32) (int, error) {
|
||||
// Can not be nil so ok to inline dereference.
|
||||
mbi := fs.blks[bi].getIndex()
|
||||
// All matching msgs are behind us.
|
||||
@@ -4097,7 +4006,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
|
||||
mb := fs.blks[seqStart]
|
||||
bi := mb.index
|
||||
|
||||
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
|
||||
gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) {
|
||||
// If the select blk start is greater than entry's last blk skip.
|
||||
if bi > psi.lblk {
|
||||
return
|
||||
@@ -4196,7 +4105,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
|
||||
var t uint64
|
||||
var havePartial bool
|
||||
var updateLLTS bool
|
||||
stree.IntersectGSL[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
|
||||
gsl.IntersectStree[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
|
||||
subj := bytesToString(bsubj)
|
||||
if havePartial {
|
||||
// If we already found a partial then don't do anything else.
|
||||
@@ -4259,7 +4168,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
|
||||
|
||||
// If we are here it's better to calculate totals from psim and adjust downward by scanning less blocks.
|
||||
start := uint32(math.MaxUint32)
|
||||
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
|
||||
gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) {
|
||||
total += psi.total
|
||||
// Keep track of start index for this subject.
|
||||
if psi.fblk < start {
|
||||
@@ -4310,7 +4219,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer
|
||||
}
|
||||
// Mark fss activity.
|
||||
mb.lsts = ats.AccessTime()
|
||||
stree.IntersectGSL(mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
|
||||
gsl.IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
|
||||
adjust += ss.Msgs
|
||||
})
|
||||
}
|
||||
@@ -4577,7 +4486,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
|
||||
// Stores a raw message with expected sequence number and timestamp.
|
||||
// Lock should be held.
|
||||
func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, ttl int64) (err error) {
|
||||
if fs.isClosed() {
|
||||
if fs.closed {
|
||||
return ErrStoreClosed
|
||||
}
|
||||
|
||||
@@ -5250,7 +5159,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
|
||||
|
||||
fsLock()
|
||||
|
||||
if fs.isClosed() {
|
||||
if fs.closed {
|
||||
fsUnlock()
|
||||
return false, ErrStoreClosed
|
||||
}
|
||||
@@ -5460,8 +5369,14 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
|
||||
// If we have a callback registered we need to release lock regardless since cb might need it to lookup msg, etc.
|
||||
fs.mu.Unlock()
|
||||
// Storage updates.
|
||||
delta := int64(msz)
|
||||
cb(-1, -delta, seq, sm.subj)
|
||||
if cb != nil {
|
||||
var subj string
|
||||
if sm != nil {
|
||||
subj = sm.subj
|
||||
}
|
||||
delta := int64(msz)
|
||||
cb(-1, -delta, seq, subj)
|
||||
}
|
||||
|
||||
if !needFSLock {
|
||||
fs.mu.Lock()
|
||||
@@ -5696,7 +5611,10 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
|
||||
}
|
||||
|
||||
func (fs *fileStore) isClosed() bool {
|
||||
return fs.closed.Load()
|
||||
fs.mu.RLock()
|
||||
closed := fs.closed
|
||||
fs.mu.RUnlock()
|
||||
return closed
|
||||
}
|
||||
|
||||
// Will spin up our flush loop.
|
||||
@@ -7077,10 +6995,11 @@ func (mb *msgBlock) ensureRawBytesLoaded() error {
|
||||
|
||||
// Sync msg and index files as needed. This is called from a timer.
|
||||
func (fs *fileStore) syncBlocks() {
|
||||
if fs.isClosed() {
|
||||
fs.mu.Lock()
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return
|
||||
}
|
||||
fs.mu.Lock()
|
||||
blks := append([]*msgBlock(nil), fs.blks...)
|
||||
lmb, firstMoved, firstSeq := fs.lmb, fs.firstMoved, fs.state.FirstSeq
|
||||
// Clear first moved.
|
||||
@@ -7170,10 +7089,11 @@ func (fs *fileStore) syncBlocks() {
|
||||
}
|
||||
}
|
||||
|
||||
if fs.isClosed() {
|
||||
fs.mu.Lock()
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return
|
||||
}
|
||||
fs.mu.Lock()
|
||||
fs.setSyncTimer()
|
||||
if markDirty {
|
||||
fs.dirty++
|
||||
@@ -8019,14 +7939,17 @@ func (fs *fileStore) msgForSeq(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
|
||||
|
||||
// Will return message for the given sequence number.
|
||||
func (fs *fileStore) msgForSeqLocked(seq uint64, sm *StoreMsg, needFSLock bool) (*StoreMsg, error) {
|
||||
if fs.isClosed() {
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
// TODO(dlc) - Since Store, Remove, Skip all hold the write lock on fs this will
|
||||
// be stalled. Need another lock if want to happen in parallel.
|
||||
if needFSLock {
|
||||
fs.mu.RLock()
|
||||
}
|
||||
if fs.closed {
|
||||
if needFSLock {
|
||||
fs.mu.RUnlock()
|
||||
}
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
// Indicates we want first msg.
|
||||
if seq == 0 {
|
||||
seq = fs.state.FirstSeq
|
||||
@@ -8205,14 +8128,10 @@ func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
|
||||
|
||||
// loadLast will load the last message for a subject. Subject should be non empty and not ">".
|
||||
func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err error) {
|
||||
if fs.isClosed() {
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
|
||||
fs.mu.RLock()
|
||||
defer fs.mu.RUnlock()
|
||||
|
||||
if fs.lmb == nil {
|
||||
if fs.closed || fs.lmb == nil {
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
|
||||
@@ -8309,15 +8228,15 @@ func (fs *fileStore) LoadLastMsg(subject string, smv *StoreMsg) (sm *StoreMsg, e
|
||||
|
||||
// LoadNextMsgMulti will find the next message matching any entry in the sublist.
|
||||
func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
|
||||
if fs.isClosed() {
|
||||
return nil, 0, ErrStoreClosed
|
||||
}
|
||||
if sl == nil {
|
||||
return fs.LoadNextMsg(_EMPTY_, false, start, smp)
|
||||
}
|
||||
fs.mu.RLock()
|
||||
defer fs.mu.RUnlock()
|
||||
|
||||
if fs.closed {
|
||||
return nil, 0, ErrStoreClosed
|
||||
}
|
||||
if fs.state.Msgs == 0 || start > fs.state.LastSeq {
|
||||
return nil, fs.state.LastSeq, ErrStoreEOF
|
||||
}
|
||||
@@ -8325,31 +8244,6 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
|
||||
start = fs.state.FirstSeq
|
||||
}
|
||||
|
||||
// If start is less than or equal to beginning of our stream, meaning our first call,
|
||||
// let's check the psim to see if we can skip ahead.
|
||||
if start <= fs.state.FirstSeq {
|
||||
var total uint64
|
||||
blkStart := uint32(math.MaxUint32)
|
||||
stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) {
|
||||
total += psi.total
|
||||
// Keep track of start index for this subject.
|
||||
if psi.fblk < blkStart {
|
||||
blkStart = psi.fblk
|
||||
}
|
||||
})
|
||||
// Nothing available.
|
||||
if total == 0 {
|
||||
return nil, fs.state.LastSeq, ErrStoreEOF
|
||||
}
|
||||
// We can skip ahead.
|
||||
if mb := fs.bim[blkStart]; mb != nil {
|
||||
fseq := atomic.LoadUint64(&mb.first.seq)
|
||||
if fseq > start {
|
||||
start = fseq
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
|
||||
for i := bi; i < len(fs.blks); i++ {
|
||||
mb := fs.blks[i]
|
||||
@@ -8360,28 +8254,8 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
|
||||
return sm, sm.seq, nil
|
||||
} else if err != ErrStoreMsgNotFound {
|
||||
return nil, 0, err
|
||||
} else {
|
||||
// Nothing found in this block. We missed, if first block (bi) check psim.
|
||||
// Similar to above if start <= first seq.
|
||||
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
|
||||
// We should not do this at all if we are already on the last block.
|
||||
if i == bi && i < len(fs.blks)-1 {
|
||||
nbi, err := fs.checkSkipFirstBlockMulti(sl, bi)
|
||||
// Nothing available.
|
||||
if err == ErrStoreEOF {
|
||||
return nil, fs.state.LastSeq, ErrStoreEOF
|
||||
}
|
||||
// See if we can jump ahead here.
|
||||
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
|
||||
// For v2 will track all blocks that have matches for psim.
|
||||
if nbi > i {
|
||||
i = nbi - 1 // For the iterator condition i++
|
||||
}
|
||||
}
|
||||
// Check if we can expire.
|
||||
if expireOk {
|
||||
mb.tryForceExpireCache()
|
||||
}
|
||||
} else if expireOk {
|
||||
mb.tryForceExpireCache()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8391,13 +8265,12 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
|
||||
}
|
||||
|
||||
func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, uint64, error) {
|
||||
if fs.isClosed() {
|
||||
return nil, 0, ErrStoreClosed
|
||||
}
|
||||
|
||||
fs.mu.RLock()
|
||||
defer fs.mu.RUnlock()
|
||||
|
||||
if fs.closed {
|
||||
return nil, 0, ErrStoreClosed
|
||||
}
|
||||
if fs.state.Msgs == 0 || start > fs.state.LastSeq {
|
||||
return nil, fs.state.LastSeq, ErrStoreEOF
|
||||
}
|
||||
@@ -8450,7 +8323,7 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
|
||||
i = nbi - 1 // For the iterator condition i++
|
||||
}
|
||||
}
|
||||
// Check if we can expire.
|
||||
// Check is we can expire.
|
||||
if expireOk {
|
||||
mb.tryForceExpireCache()
|
||||
}
|
||||
@@ -8463,13 +8336,12 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
|
||||
|
||||
// Will load the next non-deleted msg starting at the start sequence and walking backwards.
|
||||
func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) {
|
||||
if fs.isClosed() {
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
|
||||
fs.mu.RLock()
|
||||
defer fs.mu.RUnlock()
|
||||
|
||||
if fs.closed {
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
if fs.state.Msgs == 0 || start < fs.state.FirstSeq {
|
||||
return nil, ErrStoreEOF
|
||||
}
|
||||
@@ -8517,10 +8389,6 @@ func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err
|
||||
|
||||
// LoadPrevMsgMulti will find the previous message matching any entry in the sublist.
|
||||
func (fs *fileStore) LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
|
||||
if fs.isClosed() {
|
||||
return nil, 0, ErrStoreClosed
|
||||
}
|
||||
|
||||
if sl == nil {
|
||||
sm, err = fs.LoadPrevMsg(start, smp)
|
||||
return
|
||||
@@ -8528,6 +8396,9 @@ func (fs *fileStore) LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
|
||||
fs.mu.RLock()
|
||||
defer fs.mu.RUnlock()
|
||||
|
||||
if fs.closed {
|
||||
return nil, 0, ErrStoreClosed
|
||||
}
|
||||
if fs.state.Msgs == 0 || start < fs.state.FirstSeq {
|
||||
return nil, fs.state.FirstSeq, ErrStoreEOF
|
||||
}
|
||||
@@ -9081,12 +8952,12 @@ func (fs *fileStore) Purge() (uint64, error) {
|
||||
}
|
||||
|
||||
func (fs *fileStore) purge(fseq uint64) (uint64, error) {
|
||||
if fs.isClosed() {
|
||||
fs.mu.Lock()
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return 0, ErrStoreClosed
|
||||
}
|
||||
|
||||
fs.mu.Lock()
|
||||
|
||||
purged := fs.state.Msgs
|
||||
rbytes := int64(fs.state.Bytes)
|
||||
|
||||
@@ -9194,11 +9065,6 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) {
|
||||
fs.mu.Unlock()
|
||||
return fs.purge(seq)
|
||||
}
|
||||
// Short-circuit if the store was already compacted past this point.
|
||||
if fs.state.FirstSeq > seq {
|
||||
fs.mu.Unlock()
|
||||
return purged, nil
|
||||
}
|
||||
// We have to delete interior messages.
|
||||
smb := fs.selectMsgBlock(seq)
|
||||
if smb == nil {
|
||||
@@ -9246,12 +9112,7 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) {
|
||||
if err = smb.loadMsgsWithLock(); err != nil {
|
||||
goto SKIP
|
||||
}
|
||||
defer func() {
|
||||
// The lock is released once we get here, so need to re-acquire.
|
||||
smb.mu.Lock()
|
||||
smb.finishedWithCache()
|
||||
smb.mu.Unlock()
|
||||
}()
|
||||
defer smb.finishedWithCache()
|
||||
}
|
||||
for mseq := atomic.LoadUint64(&smb.first.seq); mseq < seq; mseq++ {
|
||||
sm, err := smb.cacheLookupNoCopy(mseq, &smv)
|
||||
@@ -9429,12 +9290,12 @@ SKIP:
|
||||
|
||||
// Will completely reset our store.
|
||||
func (fs *fileStore) reset() error {
|
||||
if fs.isClosed() {
|
||||
fs.mu.Lock()
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return ErrStoreClosed
|
||||
}
|
||||
|
||||
fs.mu.Lock()
|
||||
|
||||
var purged, bytes uint64
|
||||
cb := fs.scb
|
||||
|
||||
@@ -9520,10 +9381,6 @@ func (mb *msgBlock) tombsLocked() []msgId {
|
||||
|
||||
// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
|
||||
func (fs *fileStore) Truncate(seq uint64) error {
|
||||
if fs.isClosed() {
|
||||
return ErrStoreClosed
|
||||
}
|
||||
|
||||
// Check for request to reset.
|
||||
if seq == 0 {
|
||||
return fs.reset()
|
||||
@@ -9531,6 +9388,11 @@ func (fs *fileStore) Truncate(seq uint64) error {
|
||||
|
||||
fs.mu.Lock()
|
||||
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return ErrStoreClosed
|
||||
}
|
||||
|
||||
// Any existing state file will no longer be applicable. We will force write a new one
|
||||
// at the end, after we release the lock.
|
||||
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
|
||||
@@ -9820,14 +9682,6 @@ func (fs *fileStore) purgeMsgBlock(mb *msgBlock) {
|
||||
mb.finishedWithCache()
|
||||
mb.mu.Unlock()
|
||||
fs.selectNextFirst()
|
||||
|
||||
if cb := fs.scb; cb != nil {
|
||||
// If we have a callback registered, we need to release lock regardless since consumers will recalculate pending.
|
||||
fs.mu.Unlock()
|
||||
// Storage updates.
|
||||
cb(-int64(msgs), -int64(bytes), 0, _EMPTY_)
|
||||
fs.mu.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
// Called by purge to simply get rid of the cache and close our fds.
|
||||
@@ -10039,9 +9893,6 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
|
||||
func (fs *fileStore) resetGlobalPerSubjectInfo() {
|
||||
// Clear any global subject state.
|
||||
fs.psim, fs.tsl = fs.psim.Empty(), 0
|
||||
if fs.noTrackSubjects() {
|
||||
return
|
||||
}
|
||||
for _, mb := range fs.blks {
|
||||
fs.populateGlobalPerSubjectInfo(mb)
|
||||
}
|
||||
@@ -10379,10 +10230,6 @@ func (fs *fileStore) forceWriteFullStateLocked() error {
|
||||
// 3. MBs - Index, Bytes, First and Last Sequence and Timestamps, and the deleted map (avl.seqset).
|
||||
// 4. Last block index and hash of record inclusive to this stream state.
|
||||
func (fs *fileStore) _writeFullState(force, needLock bool) error {
|
||||
if fs.isClosed() {
|
||||
return nil
|
||||
}
|
||||
|
||||
fsLock := func() {
|
||||
if needLock {
|
||||
fs.mu.Lock()
|
||||
@@ -10396,7 +10243,7 @@ func (fs *fileStore) _writeFullState(force, needLock bool) error {
|
||||
|
||||
start := time.Now()
|
||||
fsLock()
|
||||
if fs.dirty == 0 {
|
||||
if fs.closed || fs.dirty == 0 {
|
||||
fsUnlock()
|
||||
return nil
|
||||
}
|
||||
@@ -10647,12 +10494,8 @@ func (fs *fileStore) Stop() error {
|
||||
|
||||
// Stop the current filestore.
|
||||
func (fs *fileStore) stop(delete, writeState bool) error {
|
||||
if fs.isClosed() {
|
||||
return ErrStoreClosed
|
||||
}
|
||||
|
||||
fs.mu.Lock()
|
||||
if fs.closing {
|
||||
if fs.closed || fs.closing {
|
||||
fs.mu.Unlock()
|
||||
return ErrStoreClosed
|
||||
}
|
||||
@@ -10688,7 +10531,7 @@ func (fs *fileStore) stop(delete, writeState bool) error {
|
||||
|
||||
// Mark as closed. Last message block needs to be cleared after
|
||||
// writeFullState has completed.
|
||||
fs.closed.Store(true)
|
||||
fs.closed = true
|
||||
fs.lmb = nil
|
||||
|
||||
// We should update the upper usage layer on a stop.
|
||||
@@ -10907,12 +10750,11 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool, err
|
||||
|
||||
// Create a snapshot of this stream and its consumer's state along with messages.
|
||||
func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumers bool) (*SnapshotResult, error) {
|
||||
if fs.isClosed() {
|
||||
fs.mu.Lock()
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
|
||||
fs.mu.Lock()
|
||||
|
||||
// Only allow one at a time.
|
||||
if fs.sips > 0 {
|
||||
fs.mu.Unlock()
|
||||
@@ -11232,9 +11074,7 @@ func (fs *fileStore) ConsumerStore(name string, created time.Time, cfg *Consumer
|
||||
go o.flushLoop(o.fch, o.qch)
|
||||
|
||||
// Make sure to load in our state from disk if needed.
|
||||
if err = o.loadState(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o.loadState()
|
||||
|
||||
// Assign to filestore.
|
||||
fs.AddConsumer(o)
|
||||
@@ -11924,15 +11764,10 @@ func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, er
|
||||
}
|
||||
|
||||
// Lock should be held. Called at startup.
|
||||
func (o *consumerFileStore) loadState() error {
|
||||
func (o *consumerFileStore) loadState() {
|
||||
if _, err := os.Stat(o.ifn); err == nil {
|
||||
// This will load our state in from disk.
|
||||
_, err = o.stateWithCopyLocked(false)
|
||||
return err
|
||||
} else if os.IsNotExist(err) {
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
o.stateWithCopyLocked(false)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
126
vendor/github.com/nats-io/nats-server/v2/server/gsl/gsl.go
generated
vendored
126
vendor/github.com/nats-io/nats-server/v2/server/gsl/gsl.go
generated
vendored
@@ -17,6 +17,9 @@ import (
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
||||
"github.com/nats-io/nats-server/v2/server/stree"
|
||||
)
|
||||
|
||||
// Sublist is a routing mechanism to handle subject distribution and
|
||||
@@ -248,9 +251,7 @@ func matchLevelForAny[T comparable](l *level[T], toks []string, np *int) bool {
|
||||
if np != nil {
|
||||
*np += len(n.subs)
|
||||
}
|
||||
if len(n.subs) > 0 {
|
||||
return true
|
||||
}
|
||||
return len(n.subs) > 0
|
||||
}
|
||||
if pwc != nil {
|
||||
if np != nil {
|
||||
@@ -369,36 +370,6 @@ func (s *GenericSublist[T]) Remove(subject string, value T) error {
|
||||
return s.remove(subject, value, true)
|
||||
}
|
||||
|
||||
// HasInterestStartingIn is a helper for subject tree intersection.
|
||||
func (s *GenericSublist[T]) HasInterestStartingIn(subj string) bool {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
var _tokens [64]string
|
||||
tokens := tokenizeSubjectIntoSlice(_tokens[:0], subj)
|
||||
return hasInterestStartingIn(s.root, tokens)
|
||||
}
|
||||
|
||||
func hasInterestStartingIn[T comparable](l *level[T], tokens []string) bool {
|
||||
if l == nil {
|
||||
return false
|
||||
}
|
||||
if len(tokens) == 0 {
|
||||
return true
|
||||
}
|
||||
token := tokens[0]
|
||||
if l.fwc != nil {
|
||||
return true
|
||||
}
|
||||
found := false
|
||||
if pwc := l.pwc; pwc != nil {
|
||||
found = found || hasInterestStartingIn(pwc.next, tokens[1:])
|
||||
}
|
||||
if n := l.nodes[token]; n != nil {
|
||||
found = found || hasInterestStartingIn(n.next, tokens[1:])
|
||||
}
|
||||
return found
|
||||
}
|
||||
|
||||
// pruneNode is used to prune an empty node from the tree.
|
||||
func (l *level[T]) pruneNode(n *node[T], t string) {
|
||||
if n == nil {
|
||||
@@ -492,15 +463,86 @@ func visitLevel[T comparable](l *level[T], depth int) int {
|
||||
return maxDepth
|
||||
}
|
||||
|
||||
// use similar to append. meaning, the updated slice will be returned
|
||||
func tokenizeSubjectIntoSlice(tts []string, subject string) []string {
|
||||
start := 0
|
||||
for i := 0; i < len(subject); i++ {
|
||||
if subject[i] == btsep {
|
||||
tts = append(tts, subject[start:i])
|
||||
start = i + 1
|
||||
// IntersectStree will match all items in the given subject tree that
|
||||
// have interest expressed in the given sublist. The callback will only be called
|
||||
// once for each subject, regardless of overlapping subscriptions in the sublist.
|
||||
func IntersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], sl *GenericSublist[T2], cb func(subj []byte, entry *T1)) {
|
||||
var _subj [255]byte
|
||||
intersectStree(st, sl.root, _subj[:0], cb)
|
||||
}
|
||||
|
||||
func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T2], subj []byte, cb func(subj []byte, entry *T1)) {
|
||||
nsubj := subj
|
||||
if len(nsubj) > 0 {
|
||||
nsubj = append(subj, '.')
|
||||
}
|
||||
if r.fwc != nil {
|
||||
// We've reached a full wildcard, do a FWC match on the stree at this point
|
||||
// and don't keep iterating downward.
|
||||
nsubj := append(nsubj, '>')
|
||||
st.Match(nsubj, cb)
|
||||
return
|
||||
}
|
||||
if r.pwc != nil {
|
||||
// We've found a partial wildcard. We'll keep iterating downwards, but first
|
||||
// check whether there's interest at this level (without triggering dupes) and
|
||||
// match if so.
|
||||
var done bool
|
||||
nsubj := append(nsubj, '*')
|
||||
if len(r.pwc.subs) > 0 {
|
||||
st.Match(nsubj, cb)
|
||||
done = true
|
||||
}
|
||||
if r.pwc.next.numNodes() > 0 {
|
||||
intersectStree(st, r.pwc.next, nsubj, cb)
|
||||
}
|
||||
if done {
|
||||
return
|
||||
}
|
||||
}
|
||||
// Normal node with subject literals, keep iterating.
|
||||
for t, n := range r.nodes {
|
||||
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
|
||||
// A wildcard at the next level will already visit these descendents
|
||||
// so skip so we don't callback the same subject more than once.
|
||||
continue
|
||||
}
|
||||
nsubj := append(nsubj, t...)
|
||||
if len(n.subs) > 0 {
|
||||
if subjectHasWildcard(bytesToString(nsubj)) {
|
||||
st.Match(nsubj, cb)
|
||||
} else {
|
||||
if e, ok := st.Find(nsubj); ok {
|
||||
cb(nsubj, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
if n.next.numNodes() > 0 {
|
||||
intersectStree(st, n.next, nsubj, cb)
|
||||
}
|
||||
}
|
||||
tts = append(tts, subject[start:])
|
||||
return tts
|
||||
}
|
||||
|
||||
// Determine if a subject has any wildcard tokens.
|
||||
func subjectHasWildcard(subject string) bool {
|
||||
// This one exits earlier then !subjectIsLiteral(subject)
|
||||
for i, c := range subject {
|
||||
if c == pwc || c == fwc {
|
||||
if (i == 0 || subject[i-1] == btsep) &&
|
||||
(i+1 == len(subject) || subject[i+1] == btsep) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer.
|
||||
// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same.
|
||||
func bytesToString(b []byte) string {
|
||||
if len(b) == 0 {
|
||||
return _EMPTY_
|
||||
}
|
||||
p := unsafe.SliceData(b)
|
||||
return unsafe.String(p, len(b))
|
||||
}
|
||||
|
||||
13
vendor/github.com/nats-io/nats-server/v2/server/jetstream.go
generated
vendored
13
vendor/github.com/nats-io/nats-server/v2/server/jetstream.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2026 The NATS Authors
|
||||
// Copyright 2019-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -1142,12 +1142,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
|
||||
}
|
||||
|
||||
js.mu.Lock()
|
||||
// Accounts get reset to nil on shutdown, since we re-acquire the locks here, we need to check again.
|
||||
if js.accounts == nil {
|
||||
js.mu.Unlock()
|
||||
return NewJSNotEnabledError()
|
||||
}
|
||||
|
||||
if jsa, ok := js.accounts[a.Name]; ok {
|
||||
a.mu.Lock()
|
||||
a.js = jsa
|
||||
@@ -1376,7 +1370,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
|
||||
}
|
||||
obs, err := mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false)
|
||||
if err != nil {
|
||||
s.Warnf(" Error adding consumer '%s > %s > %s': %v", a.Name, mset.name(), cfg.Name, err)
|
||||
s.Warnf(" Error adding consumer %q: %v", cfg.Name, err)
|
||||
continue
|
||||
}
|
||||
if isEphemeral {
|
||||
@@ -1385,6 +1379,9 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
|
||||
if !cfg.Created.IsZero() {
|
||||
obs.setCreatedTime(cfg.Created)
|
||||
}
|
||||
if err != nil {
|
||||
s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
283
vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go
generated
vendored
283
vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2020-2026 The NATS Authors
|
||||
// Copyright 2020-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -1296,6 +1296,11 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
|
||||
var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -1314,12 +1319,6 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if !doErr {
|
||||
return
|
||||
@@ -1613,6 +1612,11 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
|
||||
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -1631,12 +1635,6 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -1731,6 +1729,11 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
|
||||
var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -1749,12 +1752,6 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -1835,6 +1832,11 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
|
||||
var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -1853,12 +1855,6 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -1971,6 +1967,11 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
|
||||
ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
|
||||
Streams: []*StreamInfo{},
|
||||
}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -1989,12 +1990,6 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -2095,6 +2090,11 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
|
||||
resp.ApiResponse.Type = JSApiStreamCreateResponseType
|
||||
}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
var clusterWideConsCount int
|
||||
|
||||
@@ -2184,12 +2184,6 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -2315,6 +2309,11 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *
|
||||
name := tokenAt(subject, 6)
|
||||
|
||||
var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// If we are not in clustered mode this is a failed request.
|
||||
if !s.JetStreamIsClustered() {
|
||||
@@ -2346,12 +2345,6 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *
|
||||
return
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -2428,6 +2421,11 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _
|
||||
}
|
||||
|
||||
var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// If we are not in clustered mode this is a failed request.
|
||||
if !s.JetStreamIsClustered() {
|
||||
@@ -2462,13 +2460,6 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _
|
||||
} else if sa == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
var ca *consumerAssignment
|
||||
if sa.consumers != nil {
|
||||
ca = sa.consumers[consumer]
|
||||
@@ -2556,6 +2547,11 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco
|
||||
name := tokenAt(subject, 6)
|
||||
|
||||
var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// If we are not in clustered mode this is a failed request.
|
||||
if !s.JetStreamIsClustered() {
|
||||
@@ -2584,12 +2580,6 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco
|
||||
return
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -3082,6 +3072,11 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
|
||||
accName := tokenAt(subject, 5)
|
||||
|
||||
var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if !s.JetStreamIsClustered() {
|
||||
var streams []*stream
|
||||
@@ -3122,12 +3117,6 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
|
||||
return
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if js.isMetaRecovering() {
|
||||
// While in recovery mode, the data structures are not fully initialized
|
||||
resp.Error = NewJSClusterNotAvailError()
|
||||
@@ -3350,6 +3339,11 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
|
||||
var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -3368,12 +3362,6 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -3426,6 +3414,11 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su
|
||||
stream := tokenAt(subject, 6)
|
||||
|
||||
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// If we are in clustered mode we need to be the stream leader to proceed.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -3474,12 +3467,6 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -3551,6 +3538,11 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
|
||||
stream := tokenAt(subject, 6)
|
||||
|
||||
var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// If we are in clustered mode we need to be the stream leader to proceed.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -3599,12 +3591,6 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -3709,8 +3695,31 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account
|
||||
stream := streamNameFromSubject(subject)
|
||||
consumer := consumerNameFromSubject(subject)
|
||||
|
||||
var req JSApiConsumerUnpinRequest
|
||||
var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(msg, &req); err != nil {
|
||||
resp.Error = NewJSInvalidJSONError(err)
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if req.Group == _EMPTY_ {
|
||||
resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if !validGroupName.MatchString(req.Group) {
|
||||
resp.Error = NewJSConsumerInvalidGroupNameError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
if s.JetStreamIsClustered() {
|
||||
// Check to make sure the stream is assigned.
|
||||
js, cc := s.getJetStreamCluster()
|
||||
@@ -3762,31 +3771,6 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
var req JSApiConsumerUnpinRequest
|
||||
if err := json.Unmarshal(msg, &req); err != nil {
|
||||
resp.Error = NewJSInvalidJSONError(err)
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if req.Group == _EMPTY_ {
|
||||
resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if !validGroupName.MatchString(req.Group) {
|
||||
resp.Error = NewJSConsumerInvalidGroupNameError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -3850,6 +3834,11 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
|
||||
stream := streamNameFromSubject(subject)
|
||||
|
||||
var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// If we are in clustered mode we need to be the stream leader to proceed.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -3901,12 +3890,6 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -4524,6 +4507,11 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
|
||||
}
|
||||
|
||||
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
var req CreateConsumerRequest
|
||||
if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
|
||||
@@ -4560,20 +4548,6 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var streamName, consumerName, filteredSubject string
|
||||
var rt ccReqType
|
||||
|
||||
@@ -4606,6 +4580,14 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
|
||||
}
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if streamName != req.Stream {
|
||||
resp.Error = NewJSStreamMismatchError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
@@ -4750,6 +4732,11 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
|
||||
ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
|
||||
Consumers: []string{},
|
||||
}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -4768,12 +4755,6 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -4878,6 +4859,11 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
|
||||
ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
|
||||
Consumers: []*ConsumerInfo{},
|
||||
}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -4897,7 +4883,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
resp.Error = NewJSClusterNotAvailError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
@@ -4987,6 +4973,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
|
||||
consumerName := consumerNameFromSubject(subject)
|
||||
|
||||
var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if !isEmptyRequest(msg) {
|
||||
resp.Error = NewJSNotEmptyRequestError()
|
||||
@@ -5137,12 +5128,6 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if !acc.JetStreamEnabled() {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
@@ -5190,6 +5175,11 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun
|
||||
}
|
||||
|
||||
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
@@ -5208,12 +5198,6 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
@@ -5269,6 +5253,11 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
|
||||
|
||||
var req JSApiConsumerPauseRequest
|
||||
var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if isJSONObjectOrArray(msg) {
|
||||
if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
|
||||
@@ -5296,12 +5285,6 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
|
||||
}
|
||||
}
|
||||
|
||||
if errorOnRequiredApiLevel(hdr) {
|
||||
resp.Error = NewJSRequiredApiLevelError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if hasJS, doErr := acc.checkJetStream(); !hasJS {
|
||||
if doErr {
|
||||
resp.Error = NewJSNotEnabledForAccountError()
|
||||
|
||||
16
vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go
generated
vendored
16
vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go
generated
vendored
@@ -3142,20 +3142,20 @@ func (mset *stream) resetClusteredState(err error) bool {
|
||||
stype, tierName, replicas := mset.cfg.Storage, mset.tier, mset.cfg.Replicas
|
||||
mset.mu.RUnlock()
|
||||
|
||||
// The stream might already be deleted and not assigned to us anymore.
|
||||
// In any case, don't revive the stream if it's already closed.
|
||||
if mset.closed.Load() || (node != nil && node.IsDeleted()) {
|
||||
s.Warnf("Will not reset stream '%s > %s', stream is closed", acc, mset.name())
|
||||
// Explicitly returning true here, we want the outside to break out of the monitoring loop as well.
|
||||
return true
|
||||
}
|
||||
|
||||
assert.Unreachable("Reset clustered state", map[string]any{
|
||||
"stream": name,
|
||||
"account": acc.Name,
|
||||
"err": err,
|
||||
})
|
||||
|
||||
// The stream might already be deleted and not assigned to us anymore.
|
||||
// In any case, don't revive the stream if it's already closed.
|
||||
if mset.closed.Load() {
|
||||
s.Warnf("Will not reset stream '%s > %s', stream is closed", acc, mset.name())
|
||||
// Explicitly returning true here, we want the outside to break out of the monitoring loop as well.
|
||||
return true
|
||||
}
|
||||
|
||||
// Stepdown regardless if we are the leader here.
|
||||
if node != nil {
|
||||
node.StepDown()
|
||||
|
||||
63
vendor/github.com/nats-io/nats-server/v2/server/memstore.go
generated
vendored
63
vendor/github.com/nats-io/nats-server/v2/server/memstore.go
generated
vendored
@@ -19,6 +19,7 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -444,6 +445,7 @@ func (ms *memStore) RegisterProcessJetStreamMsg(cb ProcessJetStreamMsgHandler) {
|
||||
|
||||
// GetSeqFromTime looks for the first sequence number that has the message
|
||||
// with >= timestamp.
|
||||
// FIXME(dlc) - inefficient.
|
||||
func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
|
||||
ts := t.UnixNano()
|
||||
ms.mu.RLock()
|
||||
@@ -467,57 +469,18 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
|
||||
|
||||
last := lmsg.ts
|
||||
if ts == last {
|
||||
return lmsg.seq
|
||||
return ms.state.LastSeq
|
||||
}
|
||||
if ts > last {
|
||||
return ms.state.LastSeq + 1
|
||||
}
|
||||
|
||||
var (
|
||||
cts int64
|
||||
cseq uint64
|
||||
off uint64
|
||||
)
|
||||
|
||||
// Using a binary search, but need to be aware of interior deletes.
|
||||
fseq := ms.state.FirstSeq
|
||||
lseq := ms.state.LastSeq
|
||||
seq := lseq + 1
|
||||
loop:
|
||||
for fseq <= lseq {
|
||||
mid := fseq + (lseq-fseq)/2
|
||||
off = 0
|
||||
// Potentially skip over gaps. We keep the original middle but keep track of a
|
||||
// potential delete range with an offset.
|
||||
for {
|
||||
msg := ms.msgs[mid+off]
|
||||
if msg == nil {
|
||||
off++
|
||||
if mid+off <= lseq {
|
||||
continue
|
||||
} else {
|
||||
// Continue search to the left. Purposely ignore the skipped deletes here.
|
||||
lseq = mid - 1
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
cts = msg.ts
|
||||
cseq = msg.seq
|
||||
break
|
||||
index := sort.Search(len(ms.msgs), func(i int) bool {
|
||||
if msg := ms.msgs[ms.state.FirstSeq+uint64(i)]; msg != nil {
|
||||
return msg.ts >= ts
|
||||
}
|
||||
if cts >= ts {
|
||||
seq = cseq
|
||||
if mid == fseq {
|
||||
break
|
||||
}
|
||||
// Continue search to the left.
|
||||
lseq = mid - 1
|
||||
} else {
|
||||
// Continue search to the right (potentially skipping over interior deletes).
|
||||
fseq = mid + off + 1
|
||||
}
|
||||
}
|
||||
return seq
|
||||
return false
|
||||
})
|
||||
return uint64(index) + ms.state.FirstSeq
|
||||
}
|
||||
|
||||
// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
|
||||
@@ -943,7 +906,7 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPerS
|
||||
var havePartial bool
|
||||
var totalSkipped uint64
|
||||
// We will track start and end sequences as we go.
|
||||
stree.IntersectGSL[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
|
||||
gsl.IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
|
||||
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
|
||||
ms.recalculateForSubj(bytesToString(subj), fss)
|
||||
}
|
||||
@@ -1500,12 +1463,6 @@ func (ms *memStore) compact(seq uint64) (uint64, error) {
|
||||
var purged, bytes uint64
|
||||
|
||||
ms.mu.Lock()
|
||||
// Short-circuit if the store was already compacted past this point.
|
||||
if ms.state.FirstSeq > seq {
|
||||
ms.mu.Unlock()
|
||||
return purged, nil
|
||||
}
|
||||
|
||||
cb := ms.scb
|
||||
if seq <= ms.state.LastSeq {
|
||||
fseq := ms.state.FirstSeq
|
||||
|
||||
84
vendor/github.com/nats-io/nats-server/v2/server/monitor.go
generated
vendored
84
vendor/github.com/nats-io/nats-server/v2/server/monitor.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2013-2026 The NATS Authors
|
||||
// Copyright 2013-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -1279,7 +1279,6 @@ type Varz struct {
|
||||
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` // SlowConsumersStats are statistics about all detected Slow Consumer
|
||||
StaleConnectionStats *StaleConnectionStats `json:"stale_connection_stats,omitempty"` // StaleConnectionStats are statistics about all detected Stale Connections
|
||||
Proxies *ProxiesOptsVarz `json:"proxies,omitempty"` // Proxies hold information about network proxy devices
|
||||
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate of this server
|
||||
}
|
||||
|
||||
// JetStreamVarz contains basic runtime information about jetstream
|
||||
@@ -1292,36 +1291,34 @@ type JetStreamVarz struct {
|
||||
|
||||
// ClusterOptsVarz contains monitoring cluster information
|
||||
type ClusterOptsVarz struct {
|
||||
Name string `json:"name,omitempty"` // Name is the configured cluster name
|
||||
Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections
|
||||
Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections
|
||||
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
|
||||
URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs
|
||||
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
|
||||
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
|
||||
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
|
||||
PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size
|
||||
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
|
||||
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
|
||||
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
|
||||
Name string `json:"name,omitempty"` // Name is the configured cluster name
|
||||
Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections
|
||||
Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections
|
||||
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
|
||||
URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs
|
||||
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
|
||||
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
|
||||
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
|
||||
PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size
|
||||
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
|
||||
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
|
||||
}
|
||||
|
||||
// GatewayOptsVarz contains monitoring gateway information
|
||||
type GatewayOptsVarz struct {
|
||||
Name string `json:"name,omitempty"` // Name is the configured cluster name
|
||||
Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections
|
||||
Port int `json:"port,omitempty"` // Port is the post gateway connections listens on
|
||||
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
|
||||
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
|
||||
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
|
||||
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
|
||||
Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients
|
||||
ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make
|
||||
Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes
|
||||
RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected
|
||||
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
|
||||
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
|
||||
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificaet
|
||||
Name string `json:"name,omitempty"` // Name is the configured cluster name
|
||||
Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections
|
||||
Port int `json:"port,omitempty"` // Port is the post gateway connections listens on
|
||||
AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication
|
||||
TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
|
||||
TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections
|
||||
TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed
|
||||
Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients
|
||||
ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make
|
||||
Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes
|
||||
RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected
|
||||
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
|
||||
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
|
||||
}
|
||||
|
||||
// RemoteGatewayOptsVarz contains monitoring remote gateway information
|
||||
@@ -1343,7 +1340,6 @@ type LeafNodeOptsVarz struct {
|
||||
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed
|
||||
WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
|
||||
WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors
|
||||
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
|
||||
}
|
||||
|
||||
// DenyRules Contains lists of subjects not allowed to be imported/exported
|
||||
@@ -1374,7 +1370,6 @@ type MQTTOptsVarz struct {
|
||||
AckWait time.Duration `json:"ack_wait,omitempty"` // AckWait is how long the internal JetStream state store will allow acks to complete
|
||||
MaxAckPending uint16 `json:"max_ack_pending,omitempty"` // MaxAckPending is how many outstanding acks the internal JetStream state store will allow
|
||||
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
|
||||
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
|
||||
}
|
||||
|
||||
// WebsocketOptsVarz contains monitoring websocket information
|
||||
@@ -1393,7 +1388,6 @@ type WebsocketOptsVarz struct {
|
||||
AllowedOrigins []string `json:"allowed_origins,omitempty"` // AllowedOrigins list of configured trusted origins
|
||||
Compression bool `json:"compression,omitempty"` // Compression indicates if compression is supported
|
||||
TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
|
||||
TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
|
||||
}
|
||||
|
||||
// OCSPResponseCacheVarz contains OCSP response cache information
|
||||
@@ -1460,22 +1454,6 @@ func myUptime(d time.Duration) string {
|
||||
return fmt.Sprintf("%ds", tsecs)
|
||||
}
|
||||
|
||||
func tlsCertNotAfter(config *tls.Config) time.Time {
|
||||
if config == nil || len(config.Certificates) == 0 {
|
||||
return time.Time{}
|
||||
}
|
||||
cert := config.Certificates[0]
|
||||
leaf := cert.Leaf
|
||||
if leaf == nil {
|
||||
var err error
|
||||
leaf, err = x509.ParseCertificate(cert.Certificate[0])
|
||||
if err != nil {
|
||||
return time.Time{}
|
||||
}
|
||||
}
|
||||
return leaf.NotAfter
|
||||
}
|
||||
|
||||
// HandleRoot will show basic info and links to others handlers.
|
||||
func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
|
||||
// This feels dumb to me, but is required: https://code.google.com/p/go/issues/detail?id=4799
|
||||
@@ -1801,13 +1779,6 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
|
||||
|
||||
v.TLSOCSPPeerVerify = s.ocspPeerVerify && v.TLSRequired && s.opts.tlsConfigOpts != nil && s.opts.tlsConfigOpts.OCSPPeerConfig != nil && s.opts.tlsConfigOpts.OCSPPeerConfig.Verify
|
||||
|
||||
v.TLSCertNotAfter = tlsCertNotAfter(opts.TLSConfig)
|
||||
v.Cluster.TLSCertNotAfter = tlsCertNotAfter(opts.Cluster.TLSConfig)
|
||||
v.Gateway.TLSCertNotAfter = tlsCertNotAfter(opts.Gateway.TLSConfig)
|
||||
v.LeafNode.TLSCertNotAfter = tlsCertNotAfter(opts.LeafNode.TLSConfig)
|
||||
v.MQTT.TLSCertNotAfter = tlsCertNotAfter(opts.MQTT.TLSConfig)
|
||||
v.Websocket.TLSCertNotAfter = tlsCertNotAfter(opts.Websocket.TLSConfig)
|
||||
|
||||
if opts.Proxies != nil {
|
||||
if v.Proxies == nil {
|
||||
v.Proxies = &ProxiesOptsVarz{}
|
||||
@@ -4011,11 +3982,6 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
|
||||
return health
|
||||
}
|
||||
|
||||
// Healthz returns the health status of the server.
|
||||
func (s *Server) Healthz(opts *HealthzOptions) *HealthStatus {
|
||||
return s.healthz(opts)
|
||||
}
|
||||
|
||||
type ExpvarzStatus struct {
|
||||
Memstats json.RawMessage `json:"memstats"`
|
||||
Cmdline json.RawMessage `json:"cmdline"`
|
||||
|
||||
2
vendor/github.com/nats-io/nats-server/v2/server/opts.go
generated
vendored
2
vendor/github.com/nats-io/nats-server/v2/server/opts.go
generated
vendored
@@ -2326,7 +2326,7 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error {
|
||||
case "cluster_traffic":
|
||||
vv, ok := mv.(string)
|
||||
if !ok {
|
||||
return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'owner' string value for %q, got %v", mk, mv)}
|
||||
return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'account' string value for %q, got %v", mk, mv)}
|
||||
}
|
||||
switch vv {
|
||||
case "system", _EMPTY_:
|
||||
|
||||
29
vendor/github.com/nats-io/nats-server/v2/server/raft.go
generated
vendored
29
vendor/github.com/nats-io/nats-server/v2/server/raft.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2020-2026 The NATS Authors
|
||||
// Copyright 2020-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -83,7 +83,6 @@ type RaftNode interface {
|
||||
Stop()
|
||||
WaitForStop()
|
||||
Delete()
|
||||
IsDeleted() bool
|
||||
RecreateInternalSubs() error
|
||||
IsSystemAccount() bool
|
||||
GetTrafficAccountName() string
|
||||
@@ -232,7 +231,6 @@ type raft struct {
|
||||
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
|
||||
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
|
||||
membChanging bool // There is a membership change proposal in progress
|
||||
deleted bool // If the node was deleted.
|
||||
}
|
||||
|
||||
type proposedEntry struct {
|
||||
@@ -1737,6 +1735,11 @@ func (n *raft) StepDown(preferred ...string) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear our vote state.
|
||||
n.vote = noVote
|
||||
n.writeTermVote()
|
||||
|
||||
n.Unlock()
|
||||
|
||||
if len(preferred) > 0 && maybeLeader == noLeader {
|
||||
@@ -1917,7 +1920,6 @@ func (n *raft) Delete() {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
n.deleted = true
|
||||
if wal := n.wal; wal != nil {
|
||||
wal.Delete(false)
|
||||
}
|
||||
@@ -1925,12 +1927,6 @@ func (n *raft) Delete() {
|
||||
n.debug("Deleted")
|
||||
}
|
||||
|
||||
func (n *raft) IsDeleted() bool {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
return n.deleted
|
||||
}
|
||||
|
||||
func (n *raft) shutdown() {
|
||||
// First call to Stop or Delete should close the quit chan
|
||||
// to notify the runAs goroutines to stop what they're doing.
|
||||
@@ -3405,9 +3401,9 @@ func (n *raft) runAsCandidate() {
|
||||
n.requestVote()
|
||||
|
||||
// We vote for ourselves.
|
||||
n.votes.push(&voteResponse{term: n.term, peer: n.ID(), granted: true})
|
||||
|
||||
votes := map[string]struct{}{}
|
||||
votes := map[string]struct{}{
|
||||
n.ID(): {},
|
||||
}
|
||||
emptyVotes := map[string]struct{}{}
|
||||
|
||||
for n.State() == Candidate {
|
||||
@@ -3972,6 +3968,10 @@ CONTINUE:
|
||||
// Here we can become a leader but need to wait for resume of the apply queue.
|
||||
n.lxfer = true
|
||||
}
|
||||
} else if n.vote != noVote {
|
||||
// Since we are here we are not the chosen one but we should clear any vote preference.
|
||||
n.vote = noVote
|
||||
n.writeTermVote()
|
||||
}
|
||||
}
|
||||
case EntryAddPeer:
|
||||
@@ -4211,9 +4211,6 @@ func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) error {
|
||||
if !shouldStore {
|
||||
ae.returnToPool()
|
||||
}
|
||||
if n.csz == 1 {
|
||||
n.tryCommit(n.pindex)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
87
vendor/github.com/nats-io/nats-server/v2/server/stream.go
generated
vendored
87
vendor/github.com/nats-io/nats-server/v2/server/stream.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2026 The NATS Authors
|
||||
// Copyright 2019-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -425,7 +425,6 @@ type stream struct {
|
||||
active bool // Indicates that there are active internal subscriptions (for the subject filters)
|
||||
// and/or mirror/sources consumers are scheduled to be established or already started.
|
||||
closed atomic.Bool // Set to true when stop() is called on the stream.
|
||||
cisrun atomic.Bool // Indicates one checkInterestState is already running.
|
||||
|
||||
// Mirror
|
||||
mirror *sourceInfo
|
||||
@@ -838,9 +837,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
|
||||
|
||||
// Add created timestamp used for the store, must match that of the stream assignment if it exists.
|
||||
if sa != nil {
|
||||
// The following assignment does not require mutex
|
||||
// protection: sa.Created is immutable.
|
||||
js.mu.RLock()
|
||||
mset.created = sa.Created
|
||||
js.mu.RUnlock()
|
||||
}
|
||||
|
||||
// Start our signaling routine to process consumers.
|
||||
@@ -1822,7 +1821,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo
|
||||
// check for duplicates
|
||||
var iNames = make(map[string]struct{})
|
||||
for _, src := range cfg.Sources {
|
||||
if src == nil || !isValidName(src.Name) {
|
||||
if !isValidName(src.Name) {
|
||||
return StreamConfig{}, NewJSSourceInvalidStreamNameError()
|
||||
}
|
||||
if _, ok := iNames[src.composeIName()]; !ok {
|
||||
@@ -3163,6 +3162,7 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
}
|
||||
|
||||
mirror := mset.mirror
|
||||
mirrorWg := &mirror.wg
|
||||
|
||||
// We want to throttle here in terms of how fast we request new consumers,
|
||||
// or if the previous is still in progress.
|
||||
@@ -3321,16 +3321,7 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
|
||||
// Wait for previous processMirrorMsgs go routine to be completely done.
|
||||
// If none is running, this will not block.
|
||||
mset.mu.Lock()
|
||||
if mset.mirror == nil {
|
||||
// Mirror config has been removed.
|
||||
mset.mu.Unlock()
|
||||
return
|
||||
} else {
|
||||
wg := &mset.mirror.wg
|
||||
mset.mu.Unlock()
|
||||
wg.Wait()
|
||||
}
|
||||
mirrorWg.Wait()
|
||||
|
||||
select {
|
||||
case ccr := <-respCh:
|
||||
@@ -6054,6 +6045,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
|
||||
return nil
|
||||
}
|
||||
|
||||
// If here we will attempt to store the message.
|
||||
// Assume this will succeed.
|
||||
olmsgId := mset.lmsgId
|
||||
mset.lmsgId = msgId
|
||||
mset.lseq++
|
||||
tierName := mset.tier
|
||||
|
||||
// Republish state if needed.
|
||||
var tsubj string
|
||||
var tlseq uint64
|
||||
@@ -6077,7 +6075,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
|
||||
// If clustered this was already checked and we do not want to check here and possibly introduce skew.
|
||||
// Don't error and log if we're tracing when clustered.
|
||||
if !isClustered {
|
||||
if exceeded, err := jsa.wouldExceedLimits(stype, mset.tier, mset.cfg.Replicas, subject, hdr, msg); exceeded {
|
||||
if exceeded, err := jsa.wouldExceedLimits(stype, tierName, mset.cfg.Replicas, subject, hdr, msg); exceeded {
|
||||
if err == nil {
|
||||
err = NewJSAccountResourcesExceededError()
|
||||
}
|
||||
@@ -6136,7 +6134,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
|
||||
mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err)
|
||||
return err
|
||||
}
|
||||
// If we did not succeed increment clfs in case we are clustered.
|
||||
// If we did not succeed put those values back and increment clfs in case we are clustered.
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
mset.lseq = state.LastSeq
|
||||
mset.lmsgId = olmsgId
|
||||
bumpCLFS()
|
||||
|
||||
switch err {
|
||||
@@ -6157,8 +6159,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
|
||||
}
|
||||
|
||||
// If here we succeeded in storing the message.
|
||||
mset.lmsgId = msgId
|
||||
mset.lseq = seq
|
||||
|
||||
// If we have a msgId make sure to save.
|
||||
// This will replace our estimate from the cluster layer if we are clustered.
|
||||
@@ -7298,30 +7298,11 @@ func (mset *stream) checkInterestState() {
|
||||
return
|
||||
}
|
||||
|
||||
// Ensure only one of these runs at the same time.
|
||||
if !mset.cisrun.CompareAndSwap(false, true) {
|
||||
return
|
||||
}
|
||||
defer mset.cisrun.Store(false)
|
||||
|
||||
var ss StreamState
|
||||
mset.store.FastState(&ss)
|
||||
|
||||
asflr := uint64(math.MaxUint64)
|
||||
for _, o := range mset.getConsumers() {
|
||||
o.checkStateForInterestStream(&ss)
|
||||
o.mu.RLock()
|
||||
chkflr := o.chkflr
|
||||
o.mu.RUnlock()
|
||||
asflr = min(asflr, chkflr)
|
||||
}
|
||||
|
||||
mset.cfgMu.RLock()
|
||||
rp := mset.cfg.Retention
|
||||
mset.cfgMu.RUnlock()
|
||||
// Remove as many messages from the "head" of the stream if there's no interest anymore.
|
||||
if rp == InterestPolicy && asflr != math.MaxUint64 {
|
||||
mset.store.Compact(asflr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7408,18 +7389,20 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) {
|
||||
o.sigSubs = nil
|
||||
}
|
||||
|
||||
if mset.csl == nil {
|
||||
mset.csl = gsl.NewSublist[*consumer]()
|
||||
}
|
||||
// If no filters are present, add fwcs to sublist for that consumer.
|
||||
if newFilters == nil {
|
||||
mset.csl.Insert(fwcs, o)
|
||||
o.sigSubs = append(o.sigSubs, fwcs)
|
||||
} else {
|
||||
// If there are filters, add their subjects to sublist.
|
||||
for _, filter := range newFilters {
|
||||
mset.csl.Insert(filter, o)
|
||||
o.sigSubs = append(o.sigSubs, filter)
|
||||
if o.isLeader() {
|
||||
if mset.csl == nil {
|
||||
mset.csl = gsl.NewSublist[*consumer]()
|
||||
}
|
||||
// If no filters are preset, add fwcs to sublist for that consumer.
|
||||
if newFilters == nil {
|
||||
mset.csl.Insert(fwcs, o)
|
||||
o.sigSubs = append(o.sigSubs, fwcs)
|
||||
// If there are filters, add their subjects to sublist.
|
||||
} else {
|
||||
for _, filter := range newFilters {
|
||||
mset.csl.Insert(filter, o)
|
||||
o.sigSubs = append(o.sigSubs, filter)
|
||||
}
|
||||
}
|
||||
}
|
||||
o.mu.Unlock()
|
||||
@@ -7496,18 +7479,14 @@ func (mset *stream) partitionUnique(name string, partitions []string) bool {
|
||||
if n == name {
|
||||
continue
|
||||
}
|
||||
o.mu.RLock()
|
||||
if o.subjf == nil {
|
||||
o.mu.RUnlock()
|
||||
return false
|
||||
}
|
||||
for _, filter := range o.subjf {
|
||||
if SubjectsCollide(partition, filter.subject) {
|
||||
o.mu.RUnlock()
|
||||
return false
|
||||
}
|
||||
}
|
||||
o.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
||||
60
vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go
generated
vendored
60
vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go
generated
vendored
@@ -16,9 +16,6 @@ package stree
|
||||
import (
|
||||
"bytes"
|
||||
"slices"
|
||||
"unsafe"
|
||||
|
||||
"github.com/nats-io/nats-server/v2/server/gsl"
|
||||
)
|
||||
|
||||
// SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects.
|
||||
@@ -451,60 +448,3 @@ func LazyIntersect[TL, TR any](tl *SubjectTree[TL], tr *SubjectTree[TR], cb func
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// IntersectGSL will match all items in the given subject tree that
|
||||
// have interest expressed in the given sublist. The callback will only be called
|
||||
// once for each subject, regardless of overlapping subscriptions in the sublist.
|
||||
func IntersectGSL[T any, SL comparable](t *SubjectTree[T], sl *gsl.GenericSublist[SL], cb func(subject []byte, val *T)) {
|
||||
if t == nil || t.root == nil || sl == nil {
|
||||
return
|
||||
}
|
||||
var _pre [256]byte
|
||||
_intersectGSL(t.root, _pre[:0], sl, cb)
|
||||
}
|
||||
|
||||
func _intersectGSL[T any, SL comparable](n node, pre []byte, sl *gsl.GenericSublist[SL], cb func(subject []byte, val *T)) {
|
||||
if n.isLeaf() {
|
||||
ln := n.(*leaf[T])
|
||||
subj := append(pre, ln.suffix...)
|
||||
if sl.HasInterest(bytesToString(subj)) {
|
||||
cb(subj, &ln.value)
|
||||
}
|
||||
return
|
||||
}
|
||||
bn := n.base()
|
||||
pre = append(pre, bn.prefix...)
|
||||
for _, cn := range n.children() {
|
||||
if cn == nil {
|
||||
continue
|
||||
}
|
||||
subj := append(pre, cn.path()...)
|
||||
if !hasInterestForTokens(sl, subj, len(pre)) {
|
||||
continue
|
||||
}
|
||||
_intersectGSL(cn, pre, sl, cb)
|
||||
}
|
||||
}
|
||||
|
||||
// The subject tree can return partial tokens so we need to check starting interest
|
||||
// only from whole tokens when we encounter a tsep.
|
||||
func hasInterestForTokens[SL comparable](sl *gsl.GenericSublist[SL], subj []byte, since int) bool {
|
||||
for i := since; i < len(subj); i++ {
|
||||
if subj[i] == tsep {
|
||||
if !sl.HasInterestStartingIn(bytesToString(subj[:i])) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer.
|
||||
// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same.
|
||||
func bytesToString(b []byte) string {
|
||||
if len(b) == 0 {
|
||||
return ""
|
||||
}
|
||||
p := unsafe.SliceData(b)
|
||||
return unsafe.String(p, len(b))
|
||||
}
|
||||
|
||||
66
vendor/github.com/nats-io/nats-server/v2/server/sublist.go
generated
vendored
66
vendor/github.com/nats-io/nats-server/v2/server/sublist.go
generated
vendored
@@ -21,6 +21,8 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/nats-io/nats-server/v2/server/stree"
|
||||
)
|
||||
|
||||
// Sublist is a routing mechanism to handle subject distribution and
|
||||
@@ -816,9 +818,7 @@ func matchLevelForAny(l *level, toks []string, np, nq *int) bool {
|
||||
*nq += len(qsub)
|
||||
}
|
||||
}
|
||||
if len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 {
|
||||
return true
|
||||
}
|
||||
return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0
|
||||
}
|
||||
if pwc != nil {
|
||||
if np != nil && nq != nil {
|
||||
@@ -1726,3 +1726,63 @@ func getAllNodes(l *level, results *SublistResult) {
|
||||
getAllNodes(n.next, results)
|
||||
}
|
||||
}
|
||||
|
||||
// IntersectStree will match all items in the given subject tree that
|
||||
// have interest expressed in the given sublist. The callback will only be called
|
||||
// once for each subject, regardless of overlapping subscriptions in the sublist.
|
||||
func IntersectStree[T any](st *stree.SubjectTree[T], sl *Sublist, cb func(subj []byte, entry *T)) {
|
||||
var _subj [255]byte
|
||||
intersectStree(st, sl.root, _subj[:0], cb)
|
||||
}
|
||||
|
||||
func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb func(subj []byte, entry *T)) {
|
||||
nsubj := subj
|
||||
if len(nsubj) > 0 {
|
||||
nsubj = append(subj, '.')
|
||||
}
|
||||
if r.fwc != nil {
|
||||
// We've reached a full wildcard, do a FWC match on the stree at this point
|
||||
// and don't keep iterating downward.
|
||||
nsubj := append(nsubj, '>')
|
||||
st.Match(nsubj, cb)
|
||||
return
|
||||
}
|
||||
if r.pwc != nil {
|
||||
// We've found a partial wildcard. We'll keep iterating downwards, but first
|
||||
// check whether there's interest at this level (without triggering dupes) and
|
||||
// match if so.
|
||||
var done bool
|
||||
nsubj := append(nsubj, '*')
|
||||
if len(r.pwc.psubs)+len(r.pwc.qsubs) > 0 {
|
||||
st.Match(nsubj, cb)
|
||||
done = true
|
||||
}
|
||||
if r.pwc.next.numNodes() > 0 {
|
||||
intersectStree(st, r.pwc.next, nsubj, cb)
|
||||
}
|
||||
if done {
|
||||
return
|
||||
}
|
||||
}
|
||||
// Normal node with subject literals, keep iterating.
|
||||
for t, n := range r.nodes {
|
||||
if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 {
|
||||
// A wildcard at the next level will already visit these descendents
|
||||
// so skip so we don't callback the same subject more than once.
|
||||
continue
|
||||
}
|
||||
nsubj := append(nsubj, t...)
|
||||
if len(n.psubs)+len(n.qsubs) > 0 {
|
||||
if subjectHasWildcard(bytesToString(nsubj)) {
|
||||
st.Match(nsubj, cb)
|
||||
} else {
|
||||
if e, ok := st.Find(nsubj); ok {
|
||||
cb(nsubj, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
if n.next.numNodes() > 0 {
|
||||
intersectStree(st, n.next, nsubj, cb)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
2
vendor/github.com/nats-io/nats-server/v2/server/util.go
generated
vendored
2
vendor/github.com/nats-io/nats-server/v2/server/util.go
generated
vendored
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2025 The NATS Authors
|
||||
// Copyright 2012-2024 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
||||
67
vendor/github.com/opencloud-eu/reva/v2/pkg/store/store.go
generated
vendored
67
vendor/github.com/opencloud-eu/reva/v2/pkg/store/store.go
generated
vendored
@@ -21,6 +21,7 @@ package store
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -32,6 +33,7 @@ import (
|
||||
"github.com/opencloud-eu/reva/v2/pkg/store/etcd"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/store/memory"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/store"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
@@ -125,19 +127,33 @@ func Create(opts ...microstore.Option) microstore.Store {
|
||||
return *ocMemStore
|
||||
case TypeNatsJS:
|
||||
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
|
||||
return natsjs.NewStore(
|
||||
store := natsjs.NewStore(
|
||||
append(opts,
|
||||
natsjs.NatsOptions(natsOptions), // always pass in properly initialized default nats options
|
||||
natsjs.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
|
||||
)
|
||||
|
||||
err := updateNatsStore(opts, ttl, natsOptions)
|
||||
if err != nil {
|
||||
options.Logger.Logf(logger.ErrorLevel, "failed to update nats-js store: '%s'", err.Error())
|
||||
}
|
||||
|
||||
return store
|
||||
case TypeNatsJSKV:
|
||||
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
|
||||
return natsjskv.NewStore(
|
||||
store := natsjskv.NewStore(
|
||||
append(opts,
|
||||
natsjskv.NatsOptions(natsOptions), // always pass in properly initialized default nats options
|
||||
natsjskv.EncodeKeys(), // nats has restrictions on the key, we cannot use slashes
|
||||
natsjskv.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
|
||||
)
|
||||
|
||||
err := updateNatsStore(opts, ttl, natsOptions)
|
||||
if err != nil {
|
||||
options.Logger.Logf(logger.ErrorLevel, "failed to update nats-js-kv store: '%s'", err.Error())
|
||||
}
|
||||
|
||||
return store
|
||||
case TypeMemory, "mem", "": // allow existing short form and use as default
|
||||
return microstore.NewMemoryStore(opts...)
|
||||
default:
|
||||
@@ -146,13 +162,58 @@ func Create(opts ...microstore.Option) microstore.Store {
|
||||
}
|
||||
}
|
||||
|
||||
func updateNatsStore(opts []store.Option, ttl time.Duration, natsOptions nats.Options) error {
|
||||
options := store.Options{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
bucketName := options.Database
|
||||
if bucketName == "" {
|
||||
return fmt.Errorf("bucket name (database) must be set")
|
||||
}
|
||||
|
||||
if len(options.Nodes) > 0 {
|
||||
natsOptions.Servers = options.Nodes
|
||||
}
|
||||
nc, err := natsOptions.Connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not connect to nats: %w", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
js, err := nc.JetStream()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// NATS KV buckets are actually streams named "KV_<bucket_name>"
|
||||
info, err := js.StreamInfo("KV_" + bucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get bucket info: %w", err)
|
||||
}
|
||||
|
||||
config := info.Config
|
||||
config.MaxAge = ttl
|
||||
|
||||
_, err = js.UpdateStream(&config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update bucket TTL: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func natsConfig(log logger.Logger, ctx context.Context, opts []microstore.Option) ([]microstore.Option, time.Duration, nats.Options) {
|
||||
|
||||
if mem, _ := ctx.Value(disablePersistanceContextKey{}).(bool); mem {
|
||||
opts = append(opts, natsjs.DefaultMemory())
|
||||
}
|
||||
|
||||
ttl, _ := ctx.Value(ttlContextKey{}).(time.Duration)
|
||||
ttl := time.Duration(0)
|
||||
if d, ok := ctx.Value(ttlContextKey{}).(time.Duration); ok {
|
||||
ttl = d
|
||||
}
|
||||
|
||||
// preparing natsOptions before the switch to reuse the same code
|
||||
natsOptions := nats.GetDefaultOptions()
|
||||
|
||||
8
vendor/modules.txt
vendored
8
vendor/modules.txt
vendored
@@ -746,7 +746,7 @@ github.com/google/go-querystring/query
|
||||
# github.com/google/go-tika v0.3.1
|
||||
## explicit; go 1.11
|
||||
github.com/google/go-tika/tika
|
||||
# github.com/google/go-tpm v0.9.8
|
||||
# github.com/google/go-tpm v0.9.7
|
||||
## explicit; go 1.22
|
||||
github.com/google/go-tpm/legacy/tpm2
|
||||
github.com/google/go-tpm/tpmutil
|
||||
@@ -873,7 +873,7 @@ github.com/justinas/alice
|
||||
# github.com/kevinburke/ssh_config v1.2.0
|
||||
## explicit
|
||||
github.com/kevinburke/ssh_config
|
||||
# github.com/klauspost/compress v1.18.3
|
||||
# github.com/klauspost/compress v1.18.2
|
||||
## explicit; go 1.23
|
||||
github.com/klauspost/compress
|
||||
github.com/klauspost/compress/flate
|
||||
@@ -1155,7 +1155,7 @@ github.com/munnerz/goautoneg
|
||||
# github.com/nats-io/jwt/v2 v2.8.0
|
||||
## explicit; go 1.23.0
|
||||
github.com/nats-io/jwt/v2
|
||||
# github.com/nats-io/nats-server/v2 v2.12.4
|
||||
# github.com/nats-io/nats-server/v2 v2.12.3
|
||||
## explicit; go 1.24.0
|
||||
github.com/nats-io/nats-server/v2/conf
|
||||
github.com/nats-io/nats-server/v2/internal/fastrand
|
||||
@@ -1376,7 +1376,7 @@ github.com/opencloud-eu/icap-client
|
||||
# github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
|
||||
## explicit; go 1.18
|
||||
github.com/opencloud-eu/libre-graph-api-go
|
||||
# github.com/opencloud-eu/reva/v2 v2.42.0
|
||||
# github.com/opencloud-eu/reva/v2 v2.42.1
|
||||
## explicit; go 1.24.1
|
||||
github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace
|
||||
github.com/opencloud-eu/reva/v2/cmd/revad/runtime
|
||||
|
||||
Reference in New Issue
Block a user