diff --git a/go.mod b/go.mod index ca05b2c561..61995cc0bb 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/go-micro/plugins/v4/registry/nats v1.2.2-0.20230723205323-1ada01245674 github.com/go-micro/plugins/v4/server/grpc v1.2.0 github.com/go-micro/plugins/v4/server/http v1.2.2 + github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20230807070816-bc05fb076ce7 github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0 github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0 github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry v1.2.0 @@ -190,7 +191,6 @@ require ( github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7 // indirect - github.com/go-micro/plugins/v4/store/nats-js v1.1.0 // indirect github.com/go-micro/plugins/v4/store/redis v1.2.1-0.20230510195111-07cd57e1bc9d // indirect github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect @@ -340,3 +340,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) + +replace github.com/go-micro/plugins/v4/store/nats-js => github.com/kobergj/plugins/v4/store/nats-js v1.2.1-0.20231020092801-9463c820c19a diff --git a/go.sum b/go.sum index 8bfbefde6a..58d544c369 100644 --- a/go.sum +++ b/go.sum @@ -1198,8 +1198,6 @@ github.com/go-micro/plugins/v4/server/grpc v1.2.0 h1:lXfM+/0oE/u1g0hVBYsvbP4lYOY github.com/go-micro/plugins/v4/server/grpc v1.2.0/go.mod h1:+Ah9Pf/vMSXxBM3fup/hc3N+zN2as3nIpcRaR4sBjnY= github.com/go-micro/plugins/v4/server/http v1.2.2 h1:UK2/09AU0zV3wHELuR72TZzVU2vTUhbx9qrRGrQSIWg= github.com/go-micro/plugins/v4/server/http v1.2.2/go.mod h1:YuAjaSPxcn3LI8j2FUsqx0Rxunrj4YwDV41Ax76rLl0= -github.com/go-micro/plugins/v4/store/nats-js v1.1.0 h1:6Fe1/eLtg8kRyaGvMILp4olYtTDGwYNBXyb1sYfAWGk= -github.com/go-micro/plugins/v4/store/nats-js v1.1.0/go.mod h1:jJf7Gm39OafZlT3s3UE2/9NIYj6OlI2fmZ4czSA3gvo= github.com/go-micro/plugins/v4/store/redis v1.2.1-0.20230510195111-07cd57e1bc9d h1:HQoDDVyMfdkrgXNo03ZY4vzhoOXMDZVZ4SnpBDVID6E= github.com/go-micro/plugins/v4/store/redis v1.2.1-0.20230510195111-07cd57e1bc9d/go.mod h1:MbCG0YiyPqETTtm7uHFmxQNCaW1o9hBoYtFwhbVjLUg= github.com/go-micro/plugins/v4/transport/grpc v1.1.0 h1:mXfDYfFQLnVDzjGY3o84oe4prfux9h8txsnA19dKsj8= @@ -1575,6 +1573,8 @@ github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02 github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.1.0 h1:eyi1Ad2aNJMW95zcSbmGg7Cg6cq3ADwLpMAP96d8rF0= github.com/klauspost/cpuid/v2 v2.1.0/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= +github.com/kobergj/plugins/v4/store/nats-js v1.2.1-0.20231020092801-9463c820c19a h1:W+itvdTLFGLuFh+E5IzW08n2BS02cHK91qnMo7SUxbA= +github.com/kobergj/plugins/v4/store/nats-js v1.2.1-0.20231020092801-9463c820c19a/go.mod h1:wt51O2yNmgF/F7E00IYIH0awseRGqtnmjZGn6RjbZSk= github.com/kolo/xmlrpc v0.0.0-20200310150728-e0350524596b/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/vendor/github.com/go-micro/plugins/v4/store/nats-js/context.go b/vendor/github.com/go-micro/plugins/v4/store/nats-js/context.go index e2eec2a6f8..07965c5118 100644 --- a/vendor/github.com/go-micro/plugins/v4/store/nats-js/context.go +++ b/vendor/github.com/go-micro/plugins/v4/store/nats-js/context.go @@ -6,7 +6,7 @@ import ( "go-micro.dev/v4/store" ) -// setStoreOption returns a function to setup a context with given value +// setStoreOption returns a function to setup a context with given value. func setStoreOption(k, v interface{}) store.Option { return func(o *store.Options) { if o.Context == nil { diff --git a/vendor/github.com/go-micro/plugins/v4/store/nats-js/nats.go b/vendor/github.com/go-micro/plugins/v4/store/nats-js/nats.go index 907c6c0e62..c0a4f4004f 100644 --- a/vendor/github.com/go-micro/plugins/v4/store/nats-js/nats.go +++ b/vendor/github.com/go-micro/plugins/v4/store/nats-js/nats.go @@ -34,14 +34,14 @@ type natsStore struct { conn *nats.Conn js nats.JetStreamContext - buckets map[string]nats.ObjectStore + buckets *sync.Map } func init() { cmd.DefaultStores["natsjs"] = NewStore } -// NewStore will create a new NATS JetStream Object Store +// NewStore will create a new NATS JetStream Object Store. func NewStore(opts ...store.Option) store.Store { options := store.Options{ Nodes: []string{}, @@ -55,7 +55,7 @@ func NewStore(opts ...store.Option) store.Store { opts: options, jsopts: []nats.JSOpt{}, objStoreConfigs: []*nats.ObjectStoreConfig{}, - buckets: map[string]nats.ObjectStore{}, + buckets: &sync.Map{}, storageType: nats.FileStorage, } @@ -64,7 +64,9 @@ func NewStore(opts ...store.Option) store.Store { return n } -// Init initialises the store. It must perform any required setup on the backing storage implementation and check that it is ready for use, returning any errors. +// Init initializes the store. It must perform any required setup on the +// backing storage implementation and check that it is ready for use, +// returning any errors. func (n *natsStore) Init(opts ...store.Option) error { n.setOption(opts...) @@ -101,7 +103,7 @@ func (n *natsStore) Init(opts ...store.Option) error { if err != nil { return errors.Wrapf(err, "Failed to create bucket (%s)", cfg.Bucket) } - n.buckets[cfg.Bucket] = store + n.buckets.Store(cfg.Bucket, store) } return nil @@ -159,10 +161,8 @@ func (n *natsStore) Options() store.Options { // Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error. func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { - if n.conn == nil { - if err := n.Init(); err != nil { - return nil, err - } + if err := n.initConn(); err != nil { + return nil, err } opt := store.ReadOptions{} @@ -178,51 +178,70 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, opt.Table = n.opts.Table } - bucket, ok := n.buckets[opt.Database] + b, ok := n.buckets.Load(opt.Database) if !ok { return nil, ErrBucketNotFound } + bucket := b.(nats.ObjectStore) var keys []string - objects, err := bucket.List() - if err == nats.ErrNoObjectsFound { - return []*store.Record{}, nil - } else if err != nil { - return nil, errors.Wrap(err, "Failed to list objects") + + var keyPrefix, keySuffix string + + switch { + case opt.Prefix: + keyPrefix = getKey(key, opt.Table) + case opt.Suffix: + keySuffix = key + default: + keys = []string{getKey(key, opt.Table)} } - for _, obj := range objects { - name := obj.Name - if (!opt.Prefix && !opt.Suffix) && getKey(key, opt.Table) != name { - continue - } - if opt.Prefix && !strings.HasPrefix(name, getKey(key, opt.Table)) { - continue + if len(keys) == 0 { + objects, err := bucket.List() + if err == nats.ErrNoObjectsFound { + return []*store.Record{}, nil + } else if err != nil { + return nil, errors.Wrap(err, "Failed to list objects") } - if opt.Suffix && !strings.HasSuffix(name, key) { - continue + for _, obj := range objects { + name := obj.Name + if !strings.HasPrefix(name, opt.Table) { + continue + } + if (!opt.Prefix && !opt.Suffix) && key != name { + continue + } + if opt.Prefix && !strings.HasPrefix(name, keyPrefix) { + continue + } + if opt.Suffix && !strings.HasSuffix(name, keySuffix) { + continue + } + keys = append(keys, name) } - keys = append(keys, name) } records := []*store.Record{} for _, key := range keys { obj, err := bucket.Get(key) - if err != nil { + if err == nats.ErrObjectNotFound { + return []*store.Record{}, nil + } else if err != nil { return nil, errors.Wrap(err, "Failed to get object from bucket") } - - b, err := io.ReadAll(obj) - if err != nil { - return nil, errors.Wrap(err, "Failed to read returned bytes") - } + defer obj.Close() info, err := obj.Info() if err != nil { return nil, errors.Wrap(err, "Failed to fetch record info") } + if info.Deleted { + continue + } + metadata := map[string]interface{}{} for key, value := range info.Headers { var val interface{} @@ -232,14 +251,17 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, metadata[key] = val } + b, err := io.ReadAll(obj) + if err != nil { + return nil, errors.Wrap(err, "Failed to read returned bytes") + } + records = append(records, &store.Record{ Key: key, Value: b, Metadata: metadata, }) - // Why is there a close method? - obj.Close() } if opt.Limit > 0 { @@ -253,10 +275,8 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, // Write writes a record to the store, and returns an error if the record was not written. func (n *natsStore) Write(r *store.Record, opts ...store.WriteOption) error { - if n.conn == nil { - if err := n.Init(); err != nil { - return err - } + if err := n.initConn(); err != nil { + return err } opt := store.WriteOptions{} @@ -272,8 +292,8 @@ func (n *natsStore) Write(r *store.Record, opts ...store.WriteOption) error { opt.Table = n.opts.Table } - store, ok := n.buckets[opt.Database] - + s, ok := n.buckets.Load(opt.Database) + store, _ := s.(nats.ObjectStore) // Create new bucket if not exists if !ok { var err error @@ -307,10 +327,8 @@ func (n *natsStore) Write(r *store.Record, opts ...store.WriteOption) error { // Delete removes the record with the corresponding key from the store. func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error { - if n.conn == nil { - if err := n.Init(); err != nil { - return err - } + if err := n.initConn(); err != nil { + return err } opt := store.DeleteOptions{} @@ -327,18 +345,19 @@ func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error { } if opt.Table == "DELETE_BUCKET" { - delete(n.buckets, key) + n.buckets.Delete(key) if err := n.js.DeleteObjectStore(key); err != nil { return errors.Wrap(err, "Failed to delete bucket") } return nil } - store, ok := n.buckets[opt.Database] + s, ok := n.buckets.Load(opt.Database) if !ok { return ErrBucketNotFound } + store := s.(nats.ObjectStore) if err := store.Delete(getKey(key, opt.Table)); err != nil { return errors.Wrap(err, "Failed to delete data") } @@ -347,10 +366,8 @@ func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error { // List returns any keys that match, or an empty list with no error if none matched. func (n *natsStore) List(opts ...store.ListOption) ([]string, error) { - if n.conn == nil { - if err := n.Init(); err != nil { - return nil, err - } + if err := n.initConn(); err != nil { + return nil, err } opt := store.ListOptions{} @@ -366,10 +383,11 @@ func (n *natsStore) List(opts ...store.ListOption) ([]string, error) { opt.Table = n.opts.Table } - store, ok := n.buckets[opt.Database] + s, ok := n.buckets.Load(opt.Database) if !ok { return nil, ErrBucketNotFound } + store := s.(nats.ObjectStore) objects, err := store.List() if err != nil { @@ -399,7 +417,7 @@ func (n *natsStore) List(opts ...store.ListOption) ([]string, error) { return keys, nil } -// Close the store +// Close the store. func (n *natsStore) Close() error { n.conn.Close() return nil @@ -430,6 +448,31 @@ func (n *natsStore) createNewBucket(name string) (nats.ObjectStore, error) { if err != nil { return nil, errors.Wrapf(err, "Failed to create new bucket (%s)", name) } - n.buckets[name] = store + n.buckets.Store(name, store) return store, err } + +// thread safe way to initialize the connection. +func (n *natsStore) initConn() error { + if n.hasConn() { + return nil + } + + n.Lock() + defer n.Unlock() + + // check if conn was initialized meanwhile + if n.conn != nil { + return nil + } + + return n.Init() +} + +// thread safe way to check if n is initialized. +func (n *natsStore) hasConn() bool { + n.RLock() + defer n.RUnlock() + + return n.conn != nil +} diff --git a/vendor/github.com/go-micro/plugins/v4/store/nats-js/options.go b/vendor/github.com/go-micro/plugins/v4/store/nats-js/options.go index 53a31b110a..21719e2ab6 100644 --- a/vendor/github.com/go-micro/plugins/v4/store/nats-js/options.go +++ b/vendor/github.com/go-micro/plugins/v4/store/nats-js/options.go @@ -7,7 +7,7 @@ import ( "go-micro.dev/v4/store" ) -// store.Option +// store.Option. type natsOptionsKey struct{} type jsOptionsKey struct{} type objOptionsKey struct{} @@ -15,15 +15,15 @@ type ttlOptionsKey struct{} type memoryOptionsKey struct{} type descriptionOptionsKey struct{} -// store.DeleteOption +// store.DeleteOption. type delBucketOptionsKey struct{} -// NatsOptions accepts nats.Options +// NatsOptions accepts nats.Options. func NatsOptions(opts nats.Options) store.Option { return setStoreOption(natsOptionsKey{}, opts) } -// JetStreamOptions accepts multiple nats.JSOpt +// JetStreamOptions accepts multiple nats.JSOpt. func JetStreamOptions(opts ...nats.JSOpt) store.Option { return setStoreOption(jsOptionsKey{}, opts) } @@ -35,34 +35,42 @@ func ObjectStoreOptions(cfg ...*nats.ObjectStoreConfig) store.Option { } // DefaultTTL sets the default TTL to use for new buckets -// By default no TTL is set. +// +// By default no TTL is set. // // TTL ON INDIVIDUAL WRITE CALLS IS NOT SUPPORTED, only bucket wide TTL. // Either set a default TTL with this option or provide bucket specific options -// with ObjectStoreOptions +// +// with ObjectStoreOptions func DefaultTTL(ttl time.Duration) store.Option { return setStoreOption(ttlOptionsKey{}, ttl) } // DefaultMemory sets the default storage type to memory only. // -// The default is file storage, persisting storage between service restarts. +// The default is file storage, persisting storage between service restarts. +// // Be aware that the default storage location of NATS the /tmp dir is, and thus -// won't persist reboots. +// +// won't persist reboots. func DefaultMemory() store.Option { return setStoreOption(memoryOptionsKey{}, nats.MemoryStorage) } // DefaultDescription sets the default description to use when creating new -// buckets. The default is "Store managed by go-micro" +// +// buckets. The default is "Store managed by go-micro" func DefaultDescription(text string) store.Option { return setStoreOption(descriptionOptionsKey{}, text) } // DeleteBucket will use the key passed to Delete as a bucket (database) name, -// and delete the bucket. +// +// and delete the bucket. +// // This option should not be combined with the store.DeleteFrom option, as -// that will overwrite the delete action. +// +// that will overwrite the delete action. func DeleteBucket() store.DeleteOption { return func(d *store.DeleteOptions) { d.Table = "DELETE_BUCKET" diff --git a/vendor/modules.txt b/vendor/modules.txt index e2cc47604b..461e74ee84 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -929,7 +929,7 @@ github.com/go-micro/plugins/v4/server/grpc # github.com/go-micro/plugins/v4/server/http v1.2.2 ## explicit; go 1.17 github.com/go-micro/plugins/v4/server/http -# github.com/go-micro/plugins/v4/store/nats-js v1.1.0 +# github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20230807070816-bc05fb076ce7 => github.com/kobergj/plugins/v4/store/nats-js v1.2.1-0.20231020092801-9463c820c19a ## explicit; go 1.17 github.com/go-micro/plugins/v4/store/nats-js # github.com/go-micro/plugins/v4/store/redis v1.2.1-0.20230510195111-07cd57e1bc9d @@ -2259,3 +2259,4 @@ stash.kopano.io/kgol/oidc-go # stash.kopano.io/kgol/rndm v1.1.2 ## explicit; go 1.13 stash.kopano.io/kgol/rndm +# github.com/go-micro/plugins/v4/store/nats-js => github.com/kobergj/plugins/v4/store/nats-js v1.2.1-0.20231020092801-9463c820c19a