build(deps): bump github.com/nats-io/nats-server/v2

Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.10.9 to 2.10.10.
- [Release notes](https://github.com/nats-io/nats-server/releases)
- [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml)
- [Commits](https://github.com/nats-io/nats-server/compare/v2.10.9...v2.10.10)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2024-02-06 14:50:45 +00:00
committed by Ralf Haferkamp
parent dcc17b3994
commit 892660433f
37 changed files with 1766 additions and 339 deletions

4
go.mod
View File

@@ -59,7 +59,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.2.1
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.9
github.com/nats-io/nats-server/v2 v2.10.10
github.com/nats-io/nats.go v1.32.0
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
@@ -249,7 +249,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/juliangruber/go-intersect v1.1.0 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/compress v1.17.5 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/libregraph/oidc-go v1.0.0 // indirect

8
go.sum
View File

@@ -1588,8 +1588,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.5 h1:d4vBd+7CHydUqpFBgUEKkSdtSugf9YFmSkvUYPquI5E=
github.com/klauspost/compress v1.17.5/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -1745,8 +1745,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.9 h1:VEW43Zz+p+9lARtiPM9ctd6ckun+92ZT2T17HWtwiFI=
github.com/nats-io/nats-server/v2 v2.10.9/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ=
github.com/nats-io/nats-server/v2 v2.10.10 h1:g1Wd64J5SGsoqWSx1qoNu9/At7a2x+jE7Qtf2XpEx/I=
github.com/nats-io/nats-server/v2 v2.10.10/go.mod h1:/TE61Dos8NlwZnjzyE3ZlOnM6dgl7tf937dnf4VclrA=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=

View File

@@ -212,7 +212,7 @@ func (d *compressor) writeBlockSkip(tok *tokens, index int, eof bool) error {
// Should only be used after a start/reset.
func (d *compressor) fillWindow(b []byte) {
// Do not fill window if we are in store-only or huffman mode.
if d.level <= 0 {
if d.level <= 0 && d.level > -MinCustomWindowSize {
return
}
if d.fast != nil {

View File

@@ -0,0 +1,13 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !race
package race
func ReadSlice[T any](s []T) {
}
func WriteSlice[T any](s []T) {
}

View File

@@ -0,0 +1,26 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build race
package race
import (
"runtime"
"unsafe"
)
func ReadSlice[T any](s []T) {
if len(s) == 0 {
return
}
runtime.RaceReadRange(unsafe.Pointer(&s[0]), len(s)*int(unsafe.Sizeof(s[0])))
}
func WriteSlice[T any](s []T) {
if len(s) == 0 {
return
}
runtime.RaceWriteRange(unsafe.Pointer(&s[0]), len(s)*int(unsafe.Sizeof(s[0])))
}

View File

@@ -10,6 +10,8 @@ import (
"errors"
"fmt"
"strconv"
"github.com/klauspost/compress/internal/race"
)
var (
@@ -63,6 +65,10 @@ func Decode(dst, src []byte) ([]byte, error) {
} else {
dst = make([]byte, dLen)
}
race.WriteSlice(dst)
race.ReadSlice(src[s:])
if s2Decode(dst, src[s:]) != 0 {
return nil, ErrCorrupt
}

View File

@@ -3,6 +3,8 @@
package s2
import "github.com/klauspost/compress/internal/race"
const hasAmd64Asm = true
// encodeBlock encodes a non-empty src to a guaranteed-large-enough dst. It
@@ -14,6 +16,9 @@ const hasAmd64Asm = true
// len(dst) >= MaxEncodedLen(len(src)) &&
// minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize
func encodeBlock(dst, src []byte) (d int) {
race.ReadSlice(src)
race.WriteSlice(dst)
const (
// Use 12 bit table when less than...
limit12B = 16 << 10
@@ -50,6 +55,9 @@ func encodeBlock(dst, src []byte) (d int) {
// len(dst) >= MaxEncodedLen(len(src)) &&
// minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize
func encodeBlockBetter(dst, src []byte) (d int) {
race.ReadSlice(src)
race.WriteSlice(dst)
const (
// Use 12 bit table when less than...
limit12B = 16 << 10
@@ -86,6 +94,9 @@ func encodeBlockBetter(dst, src []byte) (d int) {
// len(dst) >= MaxEncodedLen(len(src)) &&
// minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize
func encodeBlockSnappy(dst, src []byte) (d int) {
race.ReadSlice(src)
race.WriteSlice(dst)
const (
// Use 12 bit table when less than...
limit12B = 16 << 10
@@ -121,6 +132,9 @@ func encodeBlockSnappy(dst, src []byte) (d int) {
// len(dst) >= MaxEncodedLen(len(src)) &&
// minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize
func encodeBlockBetterSnappy(dst, src []byte) (d int) {
race.ReadSlice(src)
race.WriteSlice(dst)
const (
// Use 12 bit table when less than...
limit12B = 16 << 10

View File

@@ -104,12 +104,14 @@ func ReaderIgnoreStreamIdentifier() ReaderOption {
// For each chunk with the ID, the callback is called with the content.
// Any returned non-nil error will abort decompression.
// Only one callback per ID is supported, latest sent will be used.
// You can peek the stream, triggering the callback, by doing a Read with a 0
// byte buffer.
func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
return func(r *Reader) error {
if id < 0x80 || id > 0xfd {
return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
}
r.skippableCB[id] = fn
r.skippableCB[id-0x80] = fn
return nil
}
}
@@ -128,7 +130,7 @@ type Reader struct {
err error
decoded []byte
buf []byte
skippableCB [0x80]func(r io.Reader) error
skippableCB [0xff - 0x80]func(r io.Reader) error
blockStart int64 // Uncompressed offset at start of current.
index *Index
@@ -201,7 +203,7 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
// The supplied slice does not need to be the size of the read.
func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
if id < 0x80 {
r.err = fmt.Errorf("interbal error: skippable id < 0x80")
r.err = fmt.Errorf("internal error: skippable id < 0x80")
return false
}
if fn := r.skippableCB[id-0x80]; fn != nil {
@@ -1048,15 +1050,17 @@ func (r *Reader) ReadByte() (byte, error) {
}
// SkippableCB will register a callback for chunks with the specified ID.
// ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive).
// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
// For each chunk with the ID, the callback is called with the content.
// Any returned non-nil error will abort decompression.
// Only one callback per ID is supported, latest sent will be used.
// Sending a nil function will disable previous callbacks.
// You can peek the stream, triggering the callback, by doing a Read with a 0
// byte buffer.
func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
if id < 0x80 || id > chunkTypePadding {
if id < 0x80 || id >= chunkTypePadding {
return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
}
r.skippableCB[id] = fn
r.skippableCB[id-0x80] = fn
return nil
}

View File

@@ -37,6 +37,8 @@ package s2
import (
"bytes"
"hash/crc32"
"github.com/klauspost/compress/internal/race"
)
/*
@@ -112,6 +114,8 @@ var crcTable = crc32.MakeTable(crc32.Castagnoli)
// crc implements the checksum specified in section 3 of
// https://github.com/google/snappy/blob/master/framing_format.txt
func crc(b []byte) uint32 {
race.ReadSlice(b)
c := crc32.Update(0, crcTable, b)
return c>>15 | c<<17 + 0xa282ead8
}

View File

@@ -13,6 +13,8 @@ import (
"io"
"runtime"
"sync"
"github.com/klauspost/compress/internal/race"
)
const (
@@ -271,7 +273,7 @@ func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
return fmt.Errorf("skippable block excessed maximum size")
}
var header [4]byte
chunkLen := 4 + len(data)
chunkLen := len(data)
header[0] = id
header[1] = uint8(chunkLen >> 0)
header[2] = uint8(chunkLen >> 8)
@@ -282,7 +284,7 @@ func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
if err = w.err(err); err != nil {
return err
}
if n != len(data) {
if n != len(b) {
return w.err(io.ErrShortWrite)
}
w.written += int64(n)
@@ -303,9 +305,7 @@ func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
if err := write(header[:]); err != nil {
return err
}
if err := write(data); err != nil {
return err
}
return write(data)
}
// Create output...
@@ -385,6 +385,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
buf = buf[len(uncompressed):]
// Get an output buffer.
obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
race.WriteSlice(obuf)
output := make(chan result)
// Queue output now, so we keep order.
w.output <- output
@@ -393,6 +395,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
}
w.uncompWritten += int64(len(uncompressed))
go func() {
race.ReadSlice(uncompressed)
checksum := crc(uncompressed)
// Set to uncompressed.

View File

@@ -1877,6 +1877,10 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
rt := Singleton
var lat *serviceLatency
if dest == nil {
return nil, ErrMissingAccount
}
dest.mu.RLock()
se := dest.getServiceExport(to)
if se != nil {

View File

@@ -230,7 +230,6 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
}
}
}
return targetAcc, nil
}
@@ -270,6 +269,14 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
return
}
// the JWT is cleared, because if in operator mode it may hold the JWT
// for the bearer token that connected to the callout if in operator mode
// the permissions are already set on the client, this prevents a decode
// on c.RegisterNKeyUser which would have wrong values
c.mu.Lock()
c.opts.JWT = _EMPTY_
c.mu.Unlock()
// Build internal user and bind to the targeted account.
nkuser := buildInternalNkeyUser(arc, allowedConnTypes, targetAcc)
if err := c.RegisterNkeyUser(nkuser); err != nil {

View File

@@ -4089,6 +4089,12 @@ func getHeader(key string, hdr []byte) []byte {
return value
}
// For bytes.HasPrefix below.
var (
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
)
// processServiceImport is an internal callback when a subscription matches an imported service
// from another account. This includes response mappings as well.
func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) {
@@ -4110,8 +4116,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
var checkJS bool
shouldReturn := si.invalid || acc.sl == nil
if !shouldReturn && !isResponse && si.to == jsAllAPI {
subj := bytesToString(c.pa.subject)
if strings.HasPrefix(subj, jsRequestNextPre) || strings.HasPrefix(subj, jsDirectGetPre) {
if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) {
checkJS = true
}
}
@@ -4790,7 +4795,11 @@ func (c *client) processPingTimer() {
var sendPing bool
pingInterval := c.srv.getOpts().PingInterval
opts := c.srv.getOpts()
pingInterval := opts.PingInterval
if c.kind == ROUTER && opts.Cluster.PingInterval > 0 {
pingInterval = opts.Cluster.PingInterval
}
pingInterval = adjustPingInterval(c.kind, pingInterval)
now := time.Now()
needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL
@@ -4810,7 +4819,11 @@ func (c *client) processPingTimer() {
if sendPing {
// Check for violation
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
maxPingsOut := opts.MaxPingsOut
if c.kind == ROUTER && opts.Cluster.MaxPingsOut > 0 {
maxPingsOut = opts.Cluster.MaxPingsOut
}
if c.ping.out+1 > maxPingsOut {
c.Debugf("Stale Client Connection - Closing")
c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection")))
c.mu.Unlock()
@@ -4847,7 +4860,11 @@ func (c *client) setPingTimer() {
if c.srv == nil {
return
}
d := c.srv.getOpts().PingInterval
opts := c.srv.getOpts()
d := opts.PingInterval
if c.kind == ROUTER && opts.Cluster.PingInterval > 0 {
d = opts.Cluster.PingInterval
}
d = adjustPingInterval(c.kind, d)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}
@@ -5740,6 +5757,9 @@ func (c *client) setFirstPingTimer() {
opts := s.getOpts()
d := opts.PingInterval
if c.kind == ROUTER && opts.Cluster.PingInterval > 0 {
d = opts.Cluster.PingInterval
}
if !opts.DisableShortFirstPing {
if c.kind != CLIENT {
if d > firstPingInterval {

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.10.9"
VERSION = "2.10.10"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -4384,8 +4384,9 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)
}
if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
p.Timestamp = time.Now().UnixNano()
p.Sequence = dseq
} else {
o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()}
}
@@ -4606,6 +4607,8 @@ func (o *consumer) checkPending() {
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
// Mimic behavior in processAckMsg when pending is empty.
o.adflr, o.asflr = o.dseq-1, o.sseq-1
}
// Update our state if needed.
@@ -4936,6 +4939,7 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
// This means we can reset everything at this point.
if len(o.pending) == 0 {
o.pending, o.rdc = nil, nil
o.adflr, o.asflr = o.dseq-1, o.sseq-1
}
// We need to remove all those being queued for redelivery under o.rdq
@@ -5330,33 +5334,42 @@ func (o *consumer) isMonitorRunning() bool {
return o.inMonitor
}
// If we detect that our ackfloor is higher than the stream's last sequence, return this error.
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")
// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream() {
o.mu.Lock()
func (o *consumer) checkStateForInterestStream() error {
o.mu.RLock()
// See if we need to process this update if our parent stream is not a limits policy stream.
mset := o.mset
shouldProcessState := mset != nil && o.retention != LimitsPolicy
if o.closed || !shouldProcessState {
o.mu.Unlock()
return
o.mu.RUnlock()
return nil
}
state, err := o.store.State()
o.mu.Unlock()
o.mu.RUnlock()
if err != nil {
return
return err
}
asflr := state.AckFloor.Stream
// Protect ourselves against rolling backwards.
if asflr&(1<<63) != 0 {
return
return nil
}
// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)
// Check if the underlying stream's last sequence is less than our floor.
// This can happen if the stream has been reset and has not caught up yet.
if asflr > ss.LastSeq {
return errAckFloorHigherThanLastSeq
}
for seq := ss.FirstSeq; seq <= asflr; seq++ {
mset.ackMsg(o, seq)
}
@@ -5374,4 +5387,5 @@ func (o *consumer) checkStateForInterestStream() {
}
}
}
return nil
}

View File

@@ -1625,7 +1625,6 @@ func (s *Server) shutdownEventing() {
// internal send loop to exit.
s.sendShutdownEvent()
wg.Wait()
close(rc)
s.mu.Lock()
defer s.mu.Unlock()
@@ -1637,6 +1636,9 @@ func (s *Server) shutdownEventing() {
})
// Turn everything off here.
s.sys = nil
// Make sure this is done after s.sys = nil, so that we don't
// get sends to closed channels on badly-timed config reloads.
close(rc)
}
// Request for our local connection count.

View File

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2023 The NATS Authors
// Copyright 2019-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -653,9 +653,15 @@ func (a *Account) enableAllJetStreamServiceImportsAndMappings() error {
}
if !a.serviceImportExists(jsAllAPI) {
if err := a.AddServiceImport(s.SystemAccount(), jsAllAPI, _EMPTY_); err != nil {
// Capture si so we can turn on implicit sharing with JetStream layer.
// Make sure to set "to" otherwise will incur performance slow down.
si, err := a.addServiceImport(s.SystemAccount(), jsAllAPI, jsAllAPI, nil)
if err != nil {
return fmt.Errorf("Error setting up jetstream service imports for account: %v", err)
}
a.mu.Lock()
si.share = true
a.mu.Unlock()
}
// Check if we have a Domain specified.

View File

@@ -334,8 +334,8 @@ func generateJSMappingTable(domain string) map[string]string {
// JSMaxDescription is the maximum description length for streams and consumers.
const JSMaxDescriptionLen = 4 * 1024
// JSMaxMetadataLen is the maximum length for streams an consumers metadata map.
// It's calculated by summing length of all keys an values.
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
// It's calculated by summing length of all keys and values.
const JSMaxMetadataLen = 128 * 1024
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2023 The NATS Authors
// Copyright 2020-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -432,10 +432,10 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
}
// Restart the stream in question.
// Should only be called when the stream is know in a bad state.
// Should only be called when the stream is known to be in a bad state.
func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
js.mu.Lock()
cc := js.cluster
s, cc := js.srv, js.cluster
if cc == nil {
js.mu.Unlock()
return
@@ -458,9 +458,18 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
}
rg.node = nil
}
sinceCreation := time.Since(sa.Created)
js.mu.Unlock()
// Process stream assignment to recreate.
// Check that we have given system enough time to start us up.
// This will be longer than obvious, and matches consumer logic in case system very busy.
if sinceCreation < 10*time.Second {
s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v",
acc, csa.Config.Name, sinceCreation)
return
}
js.processStreamAssignment(sa)
// If we had consumers assigned to this server they will be present in the copy, csa.
@@ -569,13 +578,24 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()
js.mu.Lock()
deleted := ca.deleted
// Check that we have not just been created.
if !deleted && time.Since(ca.Created) < 10*time.Second {
s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v",
accName, streamName, consumer, time.Since(ca.Created))
js.mu.Unlock()
return
}
// Make sure the node is stopped if still running.
if node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
deleted := ca.deleted
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
@@ -597,7 +617,7 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()
s.Warnf("Detected consumer cluster node skew '%s > %s'", accName, streamName, consumer)
s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer)
node.Delete()
o.deleteWithoutAdvisory()
restartConsumer()
@@ -1123,6 +1143,7 @@ func (js *jetStream) checkForOrphans() {
for accName, jsa := range js.accounts {
asa := cc.streams[accName]
jsa.mu.RLock()
for stream, mset := range jsa.streams {
if sa := asa[stream]; sa == nil {
streams = append(streams, mset)
@@ -1136,6 +1157,7 @@ func (js *jetStream) checkForOrphans() {
}
}
}
jsa.mu.RUnlock()
}
js.mu.Unlock()
@@ -1356,7 +1378,7 @@ func (js *jetStream) monitorCluster() {
case isLeader = <-lch:
// For meta layer synchronize everyone to our state on becoming leader.
if isLeader {
if isLeader && n.ApplyQ().len() == 0 {
n.SendSnapshot(js.metaSnapshot())
}
// Process the change.
@@ -2233,7 +2255,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState, lastSnapTime = curState, time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}
@@ -2298,7 +2320,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if mset.numConsumers() >= numExpectedConsumers {
break
}
time.Sleep(sleepTime)
select {
case <-s.quitCh:
return
case <-time.After(sleepTime):
}
}
if actual := mset.numConsumers(); actual < numExpectedConsumers {
s.Warnf("All consumers not online for '%s > %s': expected %d but only have %d", accName, mset.name(), numExpectedConsumers, actual)
@@ -2311,7 +2337,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// We can arrive here NOT being the leader, so we send the snapshot only if we are, and in this case
// reset the notion that we need to send the snapshot. If we are not, then the first time the server
// will switch to leader (in the loop below), we will send the snapshot.
if sendSnapshot && isLeader && mset != nil && n != nil {
if sendSnapshot && isLeader && mset != nil && n != nil && !isRecovering {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
@@ -2331,9 +2357,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// No special processing needed for when we are caught up on restart.
if ce == nil {
isRecovering = false
// Check on startup if we should snapshot/compact.
if _, b := n.Size(); b > compactSizeMin || n.NeedSnapshot() {
doSnapshot()
// Make sure we create a new snapshot in case things have changed such that any existing
// snapshot may no longer be valid.
doSnapshot()
// If we became leader during this time and we need to send a snapshot to our
// followers, i.e. as a result of a scale-up from R1, do it now.
if sendSnapshot && isLeader && mset != nil && n != nil {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
continue
}
@@ -2374,7 +2405,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
case isLeader = <-lch:
if isLeader {
if mset != nil && n != nil && sendSnapshot {
if mset != nil && n != nil && sendSnapshot && !isRecovering {
// If we *are* recovering at the time then this will get done when the apply queue
// handles the nil guard to show the catchup ended.
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
@@ -2841,6 +2874,19 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Process the actual message here.
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
if err == errLastSeqMismatch {
var state StreamState
mset.store.FastState(&state)
// If we have no msgs and the other side is delivering us a sequence past where we
// should be reset. This is possible if the other side has a stale snapshot and no longer
// has those messages. So compact and retry to reset.
if state.Msgs == 0 {
mset.store.Compact(lseq + 1)
// Retry
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts)
}
}
// Only return in place if we are going to reset our stream or we are out of space, or we are closed.
if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed {
return err
@@ -4086,6 +4132,7 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) {
// Make sure this removal is for what we have, otherwise ignore.
if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name {
needDelete = true
oca.deleted = true
delete(sa.consumers, ca.Name)
}
}
@@ -4366,7 +4413,6 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
recovering := ca.recovering
js.mu.RUnlock()
stopped := false
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
var err error
var acc *Account
@@ -4376,9 +4422,13 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
if mset, _ := acc.lookupStream(ca.Stream); mset != nil {
if o := mset.lookupConsumer(ca.Name); o != nil {
err = o.stopWithFlags(true, false, true, wasLeader)
stopped = true
}
}
} else if ca.Group != nil {
// We have a missing account, see if we can cleanup.
if sacc := s.SystemAccount(); sacc != nil {
os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name))
}
}
// Always delete the node if present.
@@ -4386,15 +4436,6 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
node.Delete()
}
// This is a stop gap cleanup in case
// 1) the account does not exist (and mset consumer couldn't be stopped) and/or
// 2) node was nil (and couldn't be deleted)
if !stopped || node == nil {
if sacc := s.SystemAccount(); sacc != nil {
os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name))
}
}
if !wasLeader || ca.Reply == _EMPTY_ {
if !(offline && isMetaLeader) {
return
@@ -4595,8 +4636,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
}
}
}
@@ -4658,16 +4699,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
js.setConsumerAssignmentRecovering(ca)
}
// Synchronize everyone to our state.
if isLeader && n != nil {
// Only send out if we have state.
if _, _, applied := n.Progress(); applied > 0 {
if snap, err := o.store.EncodedState(); err == nil {
n.SendSnapshot(snap)
}
}
}
// Process the change.
if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader {
doSnapshot(true)
@@ -7789,6 +7820,7 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
mset.store.FastState(&state)
mset.setCLFS(snap.Failed)
sreq := mset.calculateSyncRequest(&state, snap)
s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
mset.mu.Unlock()

View File

@@ -18,7 +18,7 @@ import (
"time"
)
func (s *Server) publishAdvisory(acc *Account, subject string, adv interface{}) {
func (s *Server) publishAdvisory(acc *Account, subject string, adv any) {
if acc == nil {
acc = s.SystemAccount()
if acc == nil {

View File

@@ -22,6 +22,7 @@ import (
"time"
"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nats-server/v2/server/stree"
)
// TODO(dlc) - This is a fairly simplistic approach but should do for now.
@@ -30,7 +31,7 @@ type memStore struct {
cfg StreamConfig
state StreamState
msgs map[uint64]*StoreMsg
fss map[string]*SimpleState
fss *stree.SubjectTree[SimpleState]
dmap avl.SequenceSet
maxp int64
scb StorageUpdateHandler
@@ -48,7 +49,7 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
}
ms := &memStore{
msgs: make(map[uint64]*StoreMsg),
fss: make(map[string]*SimpleState),
fss: stree.NewSubjectTree[SimpleState](),
maxp: cfg.MaxMsgsPer,
cfg: *cfg,
}
@@ -88,11 +89,12 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
// If the value is smaller we need to enforce that.
if ms.maxp != 0 && ms.maxp < maxp {
lm := uint64(ms.maxp)
for subj, ss := range ms.fss {
ms.fss.Iter(func(subj []byte, ss *SimpleState) bool {
if ss.Msgs > lm {
ms.enforcePerSubjectLimit(subj, ss)
ms.enforcePerSubjectLimit(bytesToString(subj), ss)
}
}
return true
})
}
ms.mu.Unlock()
@@ -113,7 +115,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
var ss *SimpleState
var asl bool
if len(subj) > 0 {
if ss = ms.fss[subj]; ss != nil {
var ok bool
if ss, ok = ms.fss.Find(stringToBytes(subj)); ok {
asl = ms.maxp > 0 && ss.Msgs >= uint64(ms.maxp)
}
}
@@ -191,7 +194,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
ms.enforcePerSubjectLimit(subj, ss)
}
} else {
ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
ms.fss.Insert([]byte(subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
}
@@ -363,14 +366,17 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
return ss
}
isAll := filter == _EMPTY_ || filter == fwcs
if filter == _EMPTY_ {
filter = fwcs
}
isAll := filter == fwcs
// First check if we can optimize this part.
// This means we want all and the starting sequence was before this block.
if isAll && sseq <= ms.state.FirstSeq {
total := ms.state.Msgs
if lastPerSubject {
total = uint64(len(ms.fss))
total = uint64(ms.fss.Size())
}
return SimpleState{
Msgs: total,
@@ -415,21 +421,20 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
var havePartial bool
// We will track start and end sequences as we go.
for subj, fss := range ms.fss {
if isMatch(subj) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, fss.First, fss)
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
// We matched but its a partial.
havePartial = true
// Don't break here, we will update to keep tracking last.
update(fss)
}
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
subjs := bytesToString(subj)
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, fss.First, fss)
}
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
// We matched but its a partial.
havePartial = true
// Don't break here, we will update to keep tracking last.
update(fss)
}
})
// If we did not encounter any partials we can return here.
if !havePartial {
@@ -476,7 +481,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
for seq := ms.state.FirstSeq; seq < first; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
if lastPerSubject {
tss = ms.fss[sm.subj]
tss, _ = ms.fss.Find(stringToBytes(sm.subj))
}
// If we are last per subject, make sure to only adjust if all messages are before our first.
if tss == nil || tss.Last < first {
@@ -515,26 +520,29 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
ms.mu.RLock()
defer ms.mu.RUnlock()
if len(ms.fss) == 0 {
if ms.fss.Size() == 0 {
return nil
}
fss := make(map[string]SimpleState)
for subj, ss := range ms.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subj] = oss
}
}
if subject == _EMPTY_ {
subject = fwcs
}
fss := make(map[string]SimpleState)
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, ss.First, ss)
}
oss := fss[subjs]
if oss.First == 0 { // New
fss[subjs] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subjs] = oss
}
})
return fss
}
@@ -543,7 +551,7 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
ms.mu.RLock()
defer ms.mu.RUnlock()
if len(ms.fss) == 0 {
if ms.fss.Size() == 0 {
return nil
}
@@ -553,15 +561,16 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
isAll := filterSubject == _EMPTY_ || filterSubject == fwcs
fst := make(map[string]uint64)
for subj, ss := range ms.fss {
ms.fss.Match(stringToBytes(filterSubject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if isAll {
fst[subj] = ss.Msgs
fst[subjs] = ss.Msgs
} else {
if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) {
fst[subj] = ss.Msgs
if tts := tokenizeSubjectIntoSlice(tsa[:0], subjs); isSubsetMatchTokenized(tts, fts) {
fst[subjs] = ss.Msgs
}
}
}
})
return fst
}
@@ -755,7 +764,7 @@ func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.state.Bytes = 0
ms.state.Msgs = 0
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = make(map[string]*SimpleState)
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.mu.Unlock()
if cb != nil {
@@ -846,7 +855,7 @@ func (ms *memStore) reset() error {
ms.state.Bytes = 0
// Reset msgs and fss.
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = make(map[string]*SimpleState)
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.mu.Unlock()
@@ -950,7 +959,8 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error
if subject == _EMPTY_ || subject == fwcs {
sm, ok = ms.msgs[ms.state.LastSeq]
} else if subjectIsLiteral(subject) {
if ss := ms.fss[subject]; ss != nil && ss.Msgs > 0 {
var ss *SimpleState
if ss, ok = ms.fss.Find(stringToBytes(subject)); ok && ss.Msgs > 0 {
sm, ok = ms.msgs[ss.Last]
}
} else if ss := ms.filteredStateLocked(1, subject, true); ss.Msgs > 0 {
@@ -982,12 +992,15 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
return nil, ms.state.LastSeq, ErrStoreEOF
}
isAll := filter == _EMPTY_ || filter == fwcs
if filter == _EMPTY_ {
filter = fwcs
}
isAll := filter == fwcs
// Skip scan of ms.fss is number of messages in the block are less than
// 1/2 the number of subjects in ms.fss. Or we have a wc and lots of fss entries.
const linearScanMaxFSS = 256
doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < len(ms.fss) || (wc && len(ms.fss) > linearScanMaxFSS)
doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < ms.fss.Size() || (wc && ms.fss.Size() > linearScanMaxFSS)
// Initial setup.
fseq, lseq := start, ms.state.LastSeq
@@ -996,16 +1009,14 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
subs := []string{filter}
if wc || isAll {
subs = subs[:0]
for fsubj := range ms.fss {
if isAll || subjectIsSubsetMatch(fsubj, filter) {
subs = append(subs, fsubj)
}
}
ms.fss.Match(stringToBytes(filter), func(subj []byte, val *SimpleState) {
subs = append(subs, string(subj))
})
}
fseq, lseq = ms.state.LastSeq, uint64(0)
for _, subj := range subs {
ss := ms.fss[subj]
if ss == nil {
ss, ok := ms.fss.Find(stringToBytes(subj))
if !ok {
continue
}
if ss.firstNeedsUpdate {
@@ -1093,12 +1104,12 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
// Remove a seq from the fss and select new first.
// Lock should be held.
func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
ss := ms.fss[subj]
if ss == nil {
ss, ok := ms.fss.Find(stringToBytes(subj))
if !ok {
return
}
if ss.Msgs == 1 {
delete(ms.fss, subj)
ms.fss.Delete(stringToBytes(subj))
return
}
ss.Msgs--
@@ -1198,7 +1209,7 @@ func (ms *memStore) FastState(state *StreamState) {
}
}
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
state.NumSubjects = ms.fss.Size()
ms.mu.RUnlock()
}
@@ -1208,7 +1219,7 @@ func (ms *memStore) State() StreamState {
state := ms.state
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
state.NumSubjects = ms.fss.Size()
state.Deleted = nil
// Calculate interior delete details.

View File

@@ -1439,26 +1439,31 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
}
}
// Opportunistically delete the old (legacy) consumer, from v2.10.10 and
// before. Ignore any errors that might arise.
rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName)
// Using ephemeral consumer is too risky because if this server were to be
// disconnected from the rest for few seconds, then the leader would remove
// the consumer, so even after a reconnect, we would no longer receive
// retained messages. Delete any existing durable that we have for that
// and recreate here.
// The name for the durable is $MQTT_rmsgs_<server name hash> (which is jsa.id)
rmDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
// If error other than "not found" then fail, otherwise proceed with creating
// the durable consumer.
if _, err := jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmDurName); isErrorOtherThan(err, JSConsumerNotFoundErr) {
return nil, err
}
// retained messages.
//
// So we use a durable consumer, and create a new one each time we start.
// The old one should expire and get deleted due to inactivity. The name for
// the durable is $MQTT_rmsgs_{uuid}_{server-name}, the server name is just
// for readability.
rmDurName := mqttRetainedMsgsStreamName + "_" + nuid.Next() + "_" + s.String()
ccfg := &CreateConsumerRequest{
Stream: mqttRetainedMsgsStreamName,
Config: ConsumerConfig{
Durable: rmDurName,
FilterSubject: mqttRetainedMsgsStreamSubject + ">",
DeliverSubject: rmsubj,
ReplayPolicy: ReplayInstant,
AckPolicy: AckNone,
Durable: rmDurName,
FilterSubject: mqttRetainedMsgsStreamSubject + ">",
DeliverSubject: rmsubj,
ReplayPolicy: ReplayInstant,
AckPolicy: AckNone,
InactiveThreshold: 5 * time.Minute,
},
}
if _, err := jsa.createConsumer(ccfg); err != nil {

View File

@@ -80,6 +80,8 @@ type ClusterOpts struct {
PoolSize int `json:"-"`
PinnedAccounts []string `json:"-"`
Compression CompressionOpts `json:"-"`
PingInterval time.Duration `json:"-"`
MaxPingsOut int `json:"-"`
// Not exported (used in tests)
resolver netResolver
@@ -1755,6 +1757,13 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err
*errors = append(*errors, err)
continue
}
case "ping_interval":
opts.Cluster.PingInterval = parseDuration("ping_interval", tk, mv, errors, warnings)
if opts.Cluster.PingInterval > routeMaxPingInterval {
*warnings = append(*warnings, &configErr{tk, fmt.Sprintf("Cluster 'ping_interval' will reset to %v which is the max for routes", routeMaxPingInterval)})
}
case "ping_max":
opts.Cluster.MaxPingsOut = int(mv.(int64))
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{

View File

@@ -154,11 +154,12 @@ type raft struct {
llqrt time.Time // Last quorum lost time
lsut time.Time // Last scale-up time
term uint64 // The current vote term
pterm uint64 // Previous term from the last snapshot
pindex uint64 // Previous index from the last snapshot
commit uint64 // Sequence number of the most recent commit
applied uint64 // Sequence number of the most recently applied commit
term uint64 // The current vote term
pterm uint64 // Previous term from the last snapshot
pindex uint64 // Previous index from the last snapshot
commit uint64 // Sequence number of the most recent commit
applied uint64 // Sequence number of the most recently applied commit
hcbehind bool // Were we falling behind at the last health check? (see: isCurrent)
leader string // The ID of the leader
vote string // Our current vote state
@@ -1052,7 +1053,11 @@ func (n *raft) InstallSnapshot(data []byte) error {
sn := fmt.Sprintf(snapFileT, snap.lastTerm, snap.lastIndex)
sfile := filepath.Join(snapDir, sn)
if err := os.WriteFile(sfile, n.encodeSnapshot(snap), defaultFilePerms); err != nil {
<-dios
err := os.WriteFile(sfile, n.encodeSnapshot(snap), defaultFilePerms)
dios <- struct{}{}
if err != nil {
n.Unlock()
// We could set write err here, but if this is a temporary situation, too many open files etc.
// we want to retry and snapshots are not fatal.
@@ -1187,7 +1192,11 @@ func (n *raft) loadLastSnapshot() (*snapshot, error) {
if n.snapfile == _EMPTY_ {
return nil, errNoSnapAvailable
}
<-dios
buf, err := os.ReadFile(n.snapfile)
dios <- struct{}{}
if err != nil {
n.warn("Error reading snapshot: %v", err)
os.Remove(n.snapfile)
@@ -1269,8 +1278,18 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {
return false
}
// If we were previously logging about falling behind, also log when the problem
// was cleared.
clearBehindState := func() {
if n.hcbehind {
n.warn("Health check OK, no longer falling behind")
n.hcbehind = false
}
}
// Make sure we are the leader or we know we have heard from the leader recently.
if n.State() == Leader {
clearBehindState()
return true
}
@@ -1294,6 +1313,7 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {
if n.commit == n.applied {
// At this point if we are current, we can return saying so.
clearBehindState()
return true
} else if !includeForwardProgress {
// Otherwise, if we aren't allowed to include forward progress
@@ -1311,11 +1331,13 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {
n.Lock()
if n.commit-n.applied < startDelta {
// The gap is getting smaller, so we're making forward progress.
clearBehindState()
return true
}
}
}
n.hcbehind = true
n.warn("Falling behind in health check, commit %d != applied %d", n.commit, n.applied)
return false
}
@@ -3678,14 +3700,22 @@ func writePeerState(sd string, ps *peerState) error {
if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) {
return err
}
if err := os.WriteFile(psf, encodePeerState(ps), defaultFilePerms); err != nil {
<-dios
err := os.WriteFile(psf, encodePeerState(ps), defaultFilePerms)
dios <- struct{}{}
if err != nil {
return err
}
return nil
}
func readPeerState(sd string) (ps *peerState, err error) {
<-dios
buf, err := os.ReadFile(filepath.Join(sd, peerStateFile))
dios <- struct{}{}
if err != nil {
return nil, err
}
@@ -3698,7 +3728,10 @@ const termVoteLen = idLen + 8
// readTermVote will read the largest term and who we voted from to stable storage.
// Lock should be held.
func (n *raft) readTermVote() (term uint64, voted string, err error) {
<-dios
buf, err := os.ReadFile(filepath.Join(n.sd, termVoteFile))
dios <- struct{}{}
if err != nil {
return 0, noVote, err
}
@@ -3920,6 +3953,10 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
n.resetElect(randCampaignTimeout())
}
}
// Term might have changed, make sure response has the most current
vresp.term = n.term
n.Unlock()
n.sendReply(vr.reply, vresp.encode())

View File

@@ -1780,7 +1780,15 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie
// the connection as stale based on the ping interval and max out values,
// but without actually sending pings.
if compressionConfigured {
c.ping.tmr = time.AfterFunc(opts.PingInterval*time.Duration(opts.MaxPingsOut+1), func() {
pingInterval := opts.PingInterval
pingMax := opts.MaxPingsOut
if opts.Cluster.PingInterval > 0 {
pingInterval = opts.Cluster.PingInterval
}
if opts.Cluster.MaxPingsOut > 0 {
pingMax = opts.MaxPingsOut
}
c.ping.tmr = time.AfterFunc(pingInterval*time.Duration(pingMax+1), func() {
c.mu.Lock()
c.Debugf("Stale Client Connection - Closing")
c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection")))

View File

@@ -637,9 +637,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
mset.store.FastState(&state)
// Possible race with consumer.setLeader during recovery.
mset.mu.Lock()
mset.mu.RLock()
mset.lseq = state.LastSeq
mset.mu.Unlock()
mset.mu.RUnlock()
// If no msgs (new stream), set dedupe state loaded to true.
if state.Msgs == 0 {
@@ -5763,7 +5763,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
return mset, nil
}
// This is to check for dangling messages on interest retention streams.
// This is to check for dangling messages on interest retention streams. Only called on account enable.
// Issue https://github.com/nats-io/nats-server/issues/3612
func (mset *stream) checkForOrphanMsgs() {
mset.mu.RLock()
@@ -5771,9 +5771,23 @@ func (mset *stream) checkForOrphanMsgs() {
for _, o := range mset.consumers {
consumers = append(consumers, o)
}
accName, stream := mset.acc.Name, mset.cfg.Name
var ss StreamState
mset.store.FastState(&ss)
mset.mu.RUnlock()
for _, o := range consumers {
o.checkStateForInterestStream()
if err := o.checkStateForInterestStream(); err == errAckFloorHigherThanLastSeq {
o.mu.RLock()
s, consumer := o.srv, o.name
state, _ := o.store.State()
asflr := state.AckFloor.Stream
o.mu.RUnlock()
// Warn about stream state vs our ack floor.
s.RateLimitWarnf("Detected consumer '%s > %s > %s' ack floor %d is ahead of stream's last sequence %d",
accName, stream, consumer, asflr, ss.LastSeq)
}
}
}

View File

@@ -0,0 +1,68 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stree
import (
"fmt"
"io"
"strings"
)
// For dumping out a text representation of a tree.
func (t *SubjectTree[T]) Dump(w io.Writer) {
t.dump(w, t.root, 0)
fmt.Fprintln(w)
}
// Will dump out a node.
func (t *SubjectTree[T]) dump(w io.Writer, n node, depth int) {
if n == nil {
fmt.Fprintf(w, "EMPTY\n")
return
}
if n.isLeaf() {
leaf := n.(*leaf[T])
fmt.Fprintf(w, "%s LEAF: Suffix: %q Value: %+v\n", dumpPre(depth), leaf.suffix, leaf.value)
n = nil
} else {
// We are a node type here, grab meta portion.
bn := n.base()
fmt.Fprintf(w, "%s %s Prefix: %q\n", dumpPre(depth), n.kind(), bn.prefix[:bn.prefixLen])
depth++
n.iter(func(n node) bool {
t.dump(w, n, depth)
return true
})
}
}
// For individual node/leaf dumps.
func (n *leaf[T]) kind() string { return "LEAF" }
func (n *node4) kind() string { return "NODE4" }
func (n *node16) kind() string { return "NODE16" }
func (n *node256) kind() string { return "NODE256" }
// Calculates the indendation, etc.
func dumpPre(depth int) string {
if depth == 0 {
return "-- "
} else {
var b strings.Builder
for i := 0; i < depth; i++ {
b.WriteString(" ")
}
b.WriteString("|__ ")
return b.String()
}
}

View File

@@ -0,0 +1,50 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stree
import (
"bytes"
)
// Leaf node
type leaf[T any] struct {
// This could be the whole subject, but most likely just the suffix portion.
// We will only store the suffix here and assume all prior prefix paths have
// been checked once we arrive at this leafnode.
suffix []byte
value T
}
func newLeaf[T any](suffix []byte, value T) *leaf[T] {
return &leaf[T]{copyBytes(suffix), value}
}
func (n *leaf[T]) isLeaf() bool { return true }
func (n *leaf[T]) base() *meta { return nil }
func (n *leaf[T]) match(subject []byte) bool { return bytes.Equal(subject, n.suffix) }
func (n *leaf[T]) setSuffix(suffix []byte) { n.suffix = copyBytes(suffix) }
func (n *leaf[T]) isFull() bool { return true }
func (n *leaf[T]) matchParts(parts [][]byte) ([][]byte, bool) { return matchParts(parts, n.suffix) }
func (n *leaf[T]) iter(f func(node) bool) {}
func (n *leaf[T]) children() []node { return nil }
func (n *leaf[T]) numChildren() uint16 { return 0 }
func (n *leaf[T]) path() []byte { return n.suffix }
// Not applicable to leafs and should not be called, so panic if we do.
func (n *leaf[T]) setPrefix(pre []byte) { panic("setPrefix called on leaf") }
func (n *leaf[T]) addChild(_ byte, _ node) { panic("addChild called on leaf") }
func (n *leaf[T]) findChild(_ byte) *node { panic("findChild called on leaf") }
func (n *leaf[T]) grow() node { panic("grow called on leaf") }
func (n *leaf[T]) deleteChild(_ byte) { panic("deleteChild called on leaf") }
func (n *leaf[T]) shrink() node { panic("shrink called on leaf") }

View File

@@ -0,0 +1,45 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stree
// Internal node interface.
type node interface {
isLeaf() bool
base() *meta
setPrefix(pre []byte)
addChild(c byte, n node)
findChild(c byte) *node
deleteChild(c byte)
isFull() bool
grow() node
shrink() node
matchParts(parts [][]byte) ([][]byte, bool)
kind() string
iter(f func(node) bool)
children() []node
numChildren() uint16
path() []byte
}
// Maximum prefix len
// We expect the most savings to come from long shared prefixes.
// This also makes the meta base layer exactly 64 bytes, a normal L1 cache line.
const maxPrefixLen = 60
// 64 bytes total - an L1 cache line.
type meta struct {
prefix [maxPrefixLen]byte
prefixLen uint16
size uint16
}

View File

@@ -0,0 +1,121 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stree
// Node with 16 children
type node16 struct {
meta
child [16]node
key [16]byte
}
func newNode16(prefix []byte) *node16 {
nn := &node16{}
nn.setPrefix(prefix)
return nn
}
func (n *node16) isLeaf() bool { return false }
func (n *node16) base() *meta { return &n.meta }
func (n *node16) setPrefix(pre []byte) {
n.prefixLen = uint16(min(len(pre), maxPrefixLen))
for i := uint16(0); i < n.prefixLen; i++ {
n.prefix[i] = pre[i]
}
}
// Currently we do not keep node16 sorted or use bitfields for traversal so just add to the end.
// TODO(dlc) - We should revisit here with more detailed benchmarks.
func (n *node16) addChild(c byte, nn node) {
if n.size >= 16 {
panic("node16 full!")
}
n.key[n.size] = c
n.child[n.size] = nn
n.size++
}
func (n *node16) numChildren() uint16 { return n.size }
func (n *node16) path() []byte { return n.prefix[:n.prefixLen] }
func (n *node16) findChild(c byte) *node {
for i := uint16(0); i < n.size; i++ {
if n.key[i] == c {
return &n.child[i]
}
}
return nil
}
func (n *node16) isFull() bool { return n.size >= 16 }
func (n *node16) grow() node {
nn := newNode256(n.prefix[:n.prefixLen])
for i := 0; i < 16; i++ {
nn.addChild(n.key[i], n.child[i])
}
return nn
}
// Deletes a child from the node.
func (n *node16) deleteChild(c byte) {
for i, last := uint16(0), n.size-1; i < n.size; i++ {
if n.key[i] == c {
// Unsorted so just swap in last one here, else nil if last.
if i < last {
n.key[i] = n.key[last]
n.child[i] = n.child[last]
n.key[last] = 0
n.child[last] = nil
} else {
n.key[i] = 0
n.child[i] = nil
}
n.size--
return
}
}
}
// Shrink if needed and return new node, otherwise return nil.
func (n *node16) shrink() node {
if n.size > 4 {
return nil
}
nn := newNode4(nil)
for i := uint16(0); i < n.size; i++ {
nn.addChild(n.key[i], n.child[i])
}
return nn
}
// Will match parts against our prefix.no
func (n *node16) matchParts(parts [][]byte) ([][]byte, bool) {
return matchParts(parts, n.prefix[:n.prefixLen])
}
// Iterate over all children calling func f.
func (n *node16) iter(f func(node) bool) {
for i := uint16(0); i < n.size; i++ {
if !f(n.child[i]) {
return
}
}
}
// Return our children as a slice.
func (n *node16) children() []node {
return n.child[:n.size]
}

View File

@@ -0,0 +1,97 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stree
// Node with 256 children
type node256 struct {
meta
child [256]node
}
func newNode256(prefix []byte) *node256 {
nn := &node256{}
nn.setPrefix(prefix)
return nn
}
func (n *node256) isLeaf() bool { return false }
func (n *node256) base() *meta { return &n.meta }
func (n *node256) setPrefix(pre []byte) {
n.prefixLen = uint16(min(len(pre), maxPrefixLen))
for i := uint16(0); i < n.prefixLen; i++ {
n.prefix[i] = pre[i]
}
}
func (n *node256) addChild(c byte, nn node) {
n.child[c] = nn
n.size++
}
func (n *node256) numChildren() uint16 { return n.size }
func (n *node256) path() []byte { return n.prefix[:n.prefixLen] }
func (n *node256) findChild(c byte) *node {
if n.child[c] != nil {
return &n.child[c]
}
return nil
}
func (n *node256) isFull() bool { return false }
func (n *node256) grow() node { panic("grow can not be called on node256") }
// Deletes a child from the node.
func (n *node256) deleteChild(c byte) {
if n.child[c] != nil {
n.child[c] = nil
n.size--
}
}
// Shrink if needed and return new node, otherwise return nil.
func (n *node256) shrink() node {
if n.size > 16 {
return nil
}
nn := newNode16(nil)
for c, child := range n.child {
if child != nil {
nn.addChild(byte(c), n.child[c])
}
}
return nn
}
// Will match parts against our prefix.
func (n *node256) matchParts(parts [][]byte) ([][]byte, bool) {
return matchParts(parts, n.prefix[:n.prefixLen])
}
// Iterate over all children calling func f.
func (n *node256) iter(f func(node) bool) {
for i := 0; i < 256; i++ {
if n.child[i] != nil {
if !f(n.child[i]) {
return
}
}
}
}
// Return our children as a slice.
func (n *node256) children() []node {
return n.child[:256]
}

View File

@@ -0,0 +1,116 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stree
// Node with 4 children
type node4 struct {
meta
child [4]node
key [4]byte
}
func newNode4(prefix []byte) *node4 {
nn := &node4{}
nn.setPrefix(prefix)
return nn
}
func (n *node4) isLeaf() bool { return false }
func (n *node4) base() *meta { return &n.meta }
func (n *node4) setPrefix(pre []byte) {
n.prefixLen = uint16(min(len(pre), maxPrefixLen))
for i := uint16(0); i < n.prefixLen; i++ {
n.prefix[i] = pre[i]
}
}
// Currently we do not need to keep sorted for traversal so just add to the end.
func (n *node4) addChild(c byte, nn node) {
if n.size >= 4 {
panic("node4 full!")
}
n.key[n.size] = c
n.child[n.size] = nn
n.size++
}
func (n *node4) numChildren() uint16 { return n.size }
func (n *node4) path() []byte { return n.prefix[:n.prefixLen] }
func (n *node4) findChild(c byte) *node {
for i := uint16(0); i < n.size; i++ {
if n.key[i] == c {
return &n.child[i]
}
}
return nil
}
func (n *node4) isFull() bool { return n.size >= 4 }
func (n *node4) grow() node {
nn := newNode16(n.prefix[:n.prefixLen])
for i := 0; i < 4; i++ {
nn.addChild(n.key[i], n.child[i])
}
return nn
}
// Deletes a child from the node.
func (n *node4) deleteChild(c byte) {
for i, last := uint16(0), n.size-1; i < n.size; i++ {
if n.key[i] == c {
// Unsorted so just swap in last one here, else nil if last.
if i < last {
n.key[i] = n.key[last]
n.child[i] = n.child[last]
n.key[last] = 0
n.child[last] = nil
} else {
n.key[i] = 0
n.child[i] = nil
}
n.size--
return
}
}
}
// Shrink if needed and return new node, otherwise return nil.
func (n *node4) shrink() node {
if n.size == 1 {
return n.child[0]
}
return nil
}
// Will match parts against our prefix.
func (n *node4) matchParts(parts [][]byte) ([][]byte, bool) {
return matchParts(parts, n.prefix[:n.prefixLen])
}
// Iterate over all children calling func f.
func (n *node4) iter(f func(node) bool) {
for i := uint16(0); i < n.size; i++ {
if !f(n.child[i]) {
return
}
}
}
// Return our children as a slice.
func (n *node4) children() []node {
return n.child[:n.size]
}

View File

@@ -0,0 +1,134 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stree
import (
"bytes"
)
// genParts will break a filter subject up into parts.
// We need to break this up into chunks based on wildcards, either pwc '*' or fwc '>'.
// We do not care about other tokens per se, just parts that are separated by wildcards with an optional end fwc.
func genParts(filter []byte, parts [][]byte) [][]byte {
var start int
for i, e := 0, len(filter)-1; i < len(filter); i++ {
if filter[i] == tsep {
// See if next token is pwc. Either internal or end pwc.
if i < e && filter[i+1] == pwc && (i+2 <= e && filter[i+2] == tsep || i+1 == e) {
if i > start {
parts = append(parts, filter[start:i+1])
}
parts = append(parts, filter[i+1:i+2])
i++ // Skip pwc
if i+2 <= e {
i++ // Skip next tsep from next part too.
}
start = i + 1
} else if i < e && filter[i+1] == fwc && i+1 == e {
// We have a fwc
if i > start {
parts = append(parts, filter[start:i+1])
}
parts = append(parts, filter[i+1:i+2])
i++ // Skip fwc
start = i + 1
}
} else if filter[i] == pwc || filter[i] == fwc {
// We start with a pwc or fwc.
parts = append(parts, filter[i:i+1])
if i+1 <= e {
i++ // Skip next tsep from next part too.
}
start = i + 1
}
}
if start < len(filter) {
// Check to see if we need to eat a leading tsep.
if filter[start] == tsep {
start++
}
parts = append(parts, filter[start:])
}
return parts
}
// Match our parts against a fragment, which could be prefix for nodes or a suffix for leafs.
func matchParts(parts [][]byte, frag []byte) ([][]byte, bool) {
if len(frag) == 0 {
return parts, true
}
var si int
lpi := len(parts) - 1
lf := len(frag)
for i, part := range parts {
if si >= lf {
return parts[i:], true
}
lp := len(part)
// Check for pwc or fwc place holders.
if lp == 1 {
if part[0] == pwc {
index := bytes.IndexByte(frag[si:], tsep)
// We are trying to match pwc and did not find our tsep.
// Will need to move to next node from caller.
if index < 0 {
if i == lpi {
return nil, true
}
return parts[i:], true
}
si += index + 1
continue
} else if part[0] == fwc {
// If we are here we should be good.
return nil, true
}
}
end := min(si+lp, len(frag))
// If part is bigger then the fragment, adjust to a portion on the part.
partialPart := lp > end
if partialPart {
// Frag is smaller then part itself.
part = part[:end]
}
if !bytes.Equal(part, frag[si:end]) {
return parts, false
}
// If we still have a portion of the fragment left, update and continue.
if end < lf {
si = end
continue
}
// If we matched a partial, do not move past current part
// but update the part to what was consumed. This allows upper layers to continue.
if end < lp {
if end >= lf {
parts = append([][]byte{}, parts...) // Create a copy before modifying.
parts[i] = parts[i][lf:]
} else {
i++
}
return parts[i:], true
}
if i == lpi {
return nil, true
}
// If we are here we are not the last part which means we have a wildcard
// gap, so we need to match anything up to next tsep.
si += len(part)
}
return parts, false
}

View File

@@ -0,0 +1,380 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stree
import (
"bytes"
"sort"
)
// SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects.
// Will use dynamic nodes, path compression and lazy expansion.
// The reason this exists is to not only save some memory in our filestore but to greatly optimize matching
// a wildcard subject to certain members, e.g. consumer NumPending calculations.
type SubjectTree[T any] struct {
root node
size int
}
// NewSubjectTree creates a new SubjectTree with values T.
func NewSubjectTree[T any]() *SubjectTree[T] {
return &SubjectTree[T]{}
}
// Size returns the number of elements stored.
func (t *SubjectTree[T]) Size() int {
if t == nil {
return 0
}
return t.size
}
// Will empty out the tree, or if tree is nil create a new one.
func (t *SubjectTree[T]) Empty() *SubjectTree[T] {
if t == nil {
return NewSubjectTree[T]()
}
t.root, t.size = nil, 0
return t
}
// Insert a value into the tree. Will return if the value was updated and if so the old value.
func (t *SubjectTree[T]) Insert(subject []byte, value T) (*T, bool) {
old, updated := t.insert(&t.root, subject, value, 0)
if !updated {
t.size++
}
return old, updated
}
// Find will find the value and return it or false if it was not found.
func (t *SubjectTree[T]) Find(subject []byte) (*T, bool) {
var si uint16
for n := t.root; n != nil; {
if n.isLeaf() {
if ln := n.(*leaf[T]); ln.match(subject[si:]) {
return &ln.value, true
}
return nil, false
}
// We are a node type here, grab meta portion.
if bn := n.base(); bn.prefixLen > 0 {
end := min(int(si+bn.prefixLen), len(subject))
if !bytes.Equal(subject[si:end], bn.prefix[:bn.prefixLen]) {
return nil, false
}
// Increment our subject index.
si += bn.prefixLen
}
if an := n.findChild(pivot(subject, si)); an != nil {
n = *an
} else {
return nil, false
}
}
return nil, false
}
// Delete will delete the item and return its value, or not found if it did not exist.
func (t *SubjectTree[T]) Delete(subject []byte) (*T, bool) {
val, deleted := t.delete(&t.root, subject, 0)
if deleted {
t.size--
}
return val, deleted
}
// Match will match against a subject that can have wildcards and invoke the callback func for each matched value.
func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) {
if len(filter) == 0 || cb == nil {
return
}
// We need to break this up into chunks based on wildcards, either pwc '*' or fwc '>'.
var raw [16][]byte
parts := genParts(filter, raw[:0])
var _pre [256]byte
t.match(t.root, parts, _pre[:0], cb)
}
// Iter will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk.
func (t *SubjectTree[T]) Iter(cb func(subject []byte, val *T) bool) {
if t == nil || t.root == nil {
return
}
var _pre [256]byte
t.iter(t.root, _pre[:0], cb)
}
// Internal methods
// Internal call to insert that can be recursive.
func (t *SubjectTree[T]) insert(np *node, subject []byte, value T, si int) (*T, bool) {
n := *np
if n == nil {
*np = newLeaf(subject, value)
return nil, false
}
if n.isLeaf() {
ln := n.(*leaf[T])
if ln.match(subject[si:]) {
// Replace with new value.
old := ln.value
ln.value = value
return &old, true
}
// Here we need to split this leaf.
cpi := commonPrefixLen(ln.suffix, subject[si:])
nn := newNode4(subject[si : si+cpi])
ln.setSuffix(ln.suffix[cpi:])
si += cpi
// Make sure we have different pivot, normally this will be the case unless we have overflowing prefixes.
if p := pivot(ln.suffix, 0); si < len(subject) && p == subject[si] {
// We need to split the original leaf. Recursively call into insert.
t.insert(np, subject, value, si)
// Now add the update version of *np as a child to the new node4.
nn.addChild(p, *np)
} else {
// Can just add this new leaf as a sibling.
nl := newLeaf(subject[si:], value)
nn.addChild(pivot(nl.suffix, 0), nl)
// Add back original.
nn.addChild(pivot(ln.suffix, 0), ln)
}
*np = nn
return nil, false
}
// Non-leaf nodes.
bn := n.base()
if bn.prefixLen > 0 {
cpi := commonPrefixLen(bn.prefix[:bn.prefixLen], subject[si:])
if pli := int(bn.prefixLen); cpi >= pli {
// Move past this node. We look for an existing child node to recurse into.
// If one does not exist we can create a new leaf node.
si += pli
if nn := n.findChild(pivot(subject, si)); nn != nil {
return t.insert(nn, subject, value, si)
}
if n.isFull() {
n = n.grow()
*np = n
}
n.addChild(pivot(subject, si), newLeaf(subject[si:], value))
return nil, false
} else {
// We did not match the prefix completely here.
// Calculate new prefix for this node.
prefix := subject[si : si+cpi]
si += len(prefix)
// We will insert a new node4 and attach our current node below after adjusting prefix.
nn := newNode4(prefix)
// Shift the prefix for our original node.
n.setPrefix(bn.prefix[cpi:bn.prefixLen])
nn.addChild(pivot(bn.prefix[:], 0), n)
// Add in our new leaf.
nn.addChild(pivot(subject[si:], 0), newLeaf(subject[si:], value))
// Update our node reference.
*np = nn
}
} else {
if nn := n.findChild(pivot(subject, si)); nn != nil {
return t.insert(nn, subject, value, si)
}
// No prefix and no matched child, so add in new leafnode as needed.
if n.isFull() {
n = n.grow()
*np = n
}
n.addChild(pivot(subject, si), newLeaf(subject[si:], value))
}
return nil, false
}
// internal function to recursively find the leaf to delete. Will do compaction if the item is found and removed.
func (t *SubjectTree[T]) delete(np *node, subject []byte, si uint16) (*T, bool) {
if t == nil || np == nil || *np == nil || len(subject) == 0 {
return nil, false
}
n := *np
if n.isLeaf() {
ln := n.(*leaf[T])
if ln.match(subject[si:]) {
*np = nil
return &ln.value, true
}
return nil, false
}
// Not a leaf node.
if bn := n.base(); bn.prefixLen > 0 {
if !bytes.Equal(subject[si:si+bn.prefixLen], bn.prefix[:bn.prefixLen]) {
return nil, false
}
// Increment our subject index.
si += bn.prefixLen
}
p := pivot(subject, si)
nna := n.findChild(p)
if nna == nil {
return nil, false
}
nn := *nna
if nn.isLeaf() {
ln := nn.(*leaf[T])
if ln.match(subject[si:]) {
n.deleteChild(p)
if sn := n.shrink(); sn != nil {
bn := n.base()
// Make sure to set cap so we force an append to copy below.
pre := bn.prefix[:bn.prefixLen:bn.prefixLen]
// Need to fix up prefixes/suffixes.
if sn.isLeaf() {
ln := sn.(*leaf[T])
// Make sure to set cap so we force an append to copy.
ln.suffix = append(pre, ln.suffix...)
} else {
// We are a node here, we need to add in the old prefix.
if len(pre) > 0 {
bsn := sn.base()
sn.setPrefix(append(pre, bsn.prefix[:bsn.prefixLen]...))
}
}
*np = sn
}
return &ln.value, true
}
return nil, false
}
return t.delete(nna, subject, si)
}
// Internal function which can be called recursively to match all leaf nodes to a given filter subject which
// once here has been decomposed to parts. These parts only care about wildcards, both pwc and fwc.
func (t *SubjectTree[T]) match(n node, parts [][]byte, pre []byte, cb func(subject []byte, val *T)) {
// Capture if we are sitting on a terminal fwc.
var hasFWC bool
if lp := len(parts); lp > 0 && parts[lp-1][0] == fwc {
hasFWC = true
}
for n != nil {
nparts, matched := n.matchParts(parts)
// Check if we did not match.
if !matched {
return
}
// We have matched here. If we are a leaf and have exhausted all parts or he have a FWC fire callback.
if n.isLeaf() {
if len(nparts) == 0 || hasFWC {
ln := n.(*leaf[T])
cb(append(pre, ln.suffix...), &ln.value)
}
return
}
// We have normal nodes here.
// We need to append our prefix
bn := n.base()
if bn.prefixLen > 0 {
// Note that this append may reallocate, but it doesn't modify "pre" at the "match" callsite.
pre = append(pre, bn.prefix[:bn.prefixLen]...)
}
// Check our remaining parts.
if len(nparts) == 0 && !hasFWC {
// We are a node with no parts left and we are not looking at a fwc.
// We could have a leafnode with no suffix which would be a match.
// We could also have a terminal pwc. Check for those here.
var hasTermPWC bool
if lp := len(parts); lp > 0 && len(parts[lp-1]) == 1 && parts[lp-1][0] == pwc {
// If we are sitting on a terminal pwc, put the pwc back and continue.
nparts = parts[len(parts)-1:]
hasTermPWC = true
}
for _, cn := range n.children() {
if cn == nil {
continue
}
if cn.isLeaf() {
ln := cn.(*leaf[T])
if len(ln.suffix) == 0 {
cb(append(pre, ln.suffix...), &ln.value)
} else if hasTermPWC && bytes.IndexByte(ln.suffix, tsep) < 0 {
cb(append(pre, ln.suffix...), &ln.value)
}
} else if hasTermPWC {
// We have terminal pwc so call into match again with the child node.
t.match(cn, nparts, pre, cb)
}
}
// Return regardless.
return
}
// If we are sitting on a terminal fwc, put back and continue.
if hasFWC && len(nparts) == 0 {
nparts = parts[len(parts)-1:]
}
// Here we are a node type with a partial match.
// Check if the first part is a wildcard.
fp := nparts[0]
p := pivot(fp, 0)
// Check if we have a pwc/fwc part here. This will cause us to iterate.
if len(fp) == 1 && (p == pwc || p == fwc) {
// We need to iterate over all children here for the current node
// to see if we match further down.
for _, cn := range n.children() {
if cn != nil {
t.match(cn, nparts, pre, cb)
}
}
}
// Here we have normal traversal, so find the next child.
nn := n.findChild(p)
if nn == nil {
return
}
n, parts = *nn, nparts
}
}
// Interal iter function to walk nodes in lexigraphical order.
func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T) bool) bool {
if n.isLeaf() {
ln := n.(*leaf[T])
return cb(append(pre, ln.suffix...), &ln.value)
}
// We are normal node here.
bn := n.base()
// Note that this append may reallocate, but it doesn't modify "pre" at the "iter" callsite.
pre = append(pre, bn.prefix[:bn.prefixLen]...)
// Collect nodes since unsorted.
var _nodes [256]node
nodes := _nodes[:0]
for _, cn := range n.children() {
if cn != nil {
nodes = append(nodes, cn)
}
}
// Now sort.
sort.SliceStable(nodes, func(i, j int) bool { return bytes.Compare(nodes[i].path(), nodes[j].path()) < 0 })
// Now walk the nodes in order and call into next iter.
for i := range nodes {
if !t.iter(nodes[i], pre, cb) {
return false
}
}
return true
}

View File

@@ -0,0 +1,61 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stree
// For subject matching.
const (
pwc = '*'
fwc = '>'
tsep = '.'
)
// Determine index of common prefix. No match at all is 0, etc.
func commonPrefixLen(s1, s2 []byte) int {
limit := min(len(s1), len(s2))
var i int
for ; i < limit; i++ {
if s1[i] != s2[i] {
break
}
}
return min(i, maxPrefixLen)
}
// Helper to copy bytes.
func copyBytes(src []byte) []byte {
if len(src) == 0 {
return nil
}
dst := make([]byte, len(src))
copy(dst, src)
return dst
}
type position interface{ int | uint16 }
// Can return 0 if we have all the subject as prefixes.
func pivot[N position](subject []byte, pos N) byte {
if int(pos) >= len(subject) {
return 0
}
return subject[pos]
}
// TODO(dlc) - Can be removed with Go 1.21 once server is on Go 1.22.
func min(a, b int) int {
if a < b {
return a
}
return b
}

6
vendor/modules.txt vendored
View File

@@ -1241,9 +1241,10 @@ github.com/justinas/alice
# github.com/kevinburke/ssh_config v1.2.0
## explicit
github.com/kevinburke/ssh_config
# github.com/klauspost/compress v1.17.4
# github.com/klauspost/compress v1.17.5
## explicit; go 1.19
github.com/klauspost/compress/flate
github.com/klauspost/compress/internal/race
github.com/klauspost/compress/s2
# github.com/klauspost/cpuid/v2 v2.1.0
## explicit; go 1.15
@@ -1398,7 +1399,7 @@ github.com/mschoch/smat
# github.com/nats-io/jwt/v2 v2.5.3
## explicit; go 1.18
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.10.9
# github.com/nats-io/nats-server/v2 v2.10.10
## explicit; go 1.20
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand
@@ -1409,6 +1410,7 @@ github.com/nats-io/nats-server/v2/server/avl
github.com/nats-io/nats-server/v2/server/certidp
github.com/nats-io/nats-server/v2/server/certstore
github.com/nats-io/nats-server/v2/server/pse
github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
# github.com/nats-io/nats.go v1.32.0
## explicit; go 1.20