diff --git a/changelog/unreleased/natsjs-registry.md b/changelog/unreleased/natsjs-registry.md new file mode 100644 index 000000000..609c255f5 --- /dev/null +++ b/changelog/unreleased/natsjs-registry.md @@ -0,0 +1,6 @@ +Enhancement: Introduce natsjs registry + +Introduce a registry based on the natsjs object store + +https://github.com/owncloud/ocis/issues/7272 +https://github.com/owncloud/ocis/pull/7487 diff --git a/ocis-pkg/natsjsregistry/options.go b/ocis-pkg/natsjsregistry/options.go new file mode 100644 index 000000000..dfcda616a --- /dev/null +++ b/ocis-pkg/natsjsregistry/options.go @@ -0,0 +1,32 @@ +package natsjsregistry + +import ( + "context" + "time" + + "go-micro.dev/v4/registry" + "go-micro.dev/v4/store" +) + +type storeOptionsKey struct{} +type expiryKey struct{} + +// StoreOptions sets the options for the underlying store +func StoreOptions(opts []store.Option) registry.Option { + return func(o *registry.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, storeOptionsKey{}, opts) + } +} + +// ServiceExpiry allows setting an expiry time for service registrations +func ServiceExpiry(t time.Duration) registry.Option { + return func(o *registry.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, expiryKey{}, t) + } +} diff --git a/ocis-pkg/natsjsregistry/registry.go b/ocis-pkg/natsjsregistry/registry.go new file mode 100644 index 000000000..5f1e479fc --- /dev/null +++ b/ocis-pkg/natsjsregistry/registry.go @@ -0,0 +1,137 @@ +// Package natsjsregistry implements a registry using natsjs object store +package natsjsregistry + +import ( + "context" + "encoding/json" + "errors" + "time" + + natsjs "github.com/go-micro/plugins/v4/store/nats-js" + "go-micro.dev/v4/registry" + "go-micro.dev/v4/store" + "go-micro.dev/v4/util/cmd" +) + +var _registryName = "natsjs" + +func init() { + cmd.DefaultRegistries[_registryName] = NewRegistry +} + +// NewRegistry returns a new natsjs registry +func NewRegistry(opts ...registry.Option) registry.Registry { + options := registry.Options{ + Context: context.Background(), + } + for _, o := range opts { + o(&options) + } + exp, _ := options.Context.Value(expiryKey{}).(time.Duration) + return &storeregistry{ + opts: options, + store: natsjs.NewStore(storeOptions(options)...), + typ: _registryName, + expiry: exp, + } +} + +type storeregistry struct { + opts registry.Options + store store.Store + typ string + expiry time.Duration +} + +// Init inits the registry +func (n *storeregistry) Init(opts ...registry.Option) error { + for _, o := range opts { + o(&n.opts) + } + return n.store.Init(storeOptions(n.opts)...) +} + +// Options returns the configured options +func (n *storeregistry) Options() registry.Options { + return n.opts +} + +// Register adds a service to the registry +func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOption) error { + if s == nil { + return errors.New("wont store nil service") + } + b, err := json.Marshal(s) + if err != nil { + return err + } + return n.store.Write(&store.Record{ + Key: s.Name, + Value: b, + Expiry: n.expiry, + }) +} + +// Deregister removes a service from the registry +func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error { + return n.store.Delete(s.Name) +} + +// GetService gets a specific service from the registry +func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) { + recs, err := n.store.Read(s) + if err != nil { + return nil, err + } + svcs := make([]*registry.Service, 0, len(recs)) + for _, rec := range recs { + var s registry.Service + if err := json.Unmarshal(rec.Value, &s); err != nil { + return nil, err + } + svcs = append(svcs, &s) + } + return svcs, nil +} + +// ListServices lists all registered services +func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) { + keys, err := n.store.List() + if err != nil { + return nil, err + } + + var svcs []*registry.Service + for _, k := range keys { + s, err := n.GetService(k) + if err != nil { + // TODO: continue ? + return nil, err + } + svcs = append(svcs, s...) + + } + return svcs, nil +} + +// Watch allowes following the changes in the registry if it would be implemented +func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) { + return nil, errors.New("watcher not implemented") +} + +// String returns the name of the registry +func (n *storeregistry) String() string { + return n.typ +} + +func storeOptions(opts registry.Options) []store.Option { + storeoptions := []store.Option{ + store.Nodes(opts.Addrs...), + store.Database("service-registry"), + store.Table("service-registry"), + } + if so, ok := opts.Context.Value(storeOptionsKey{}).([]store.Option); ok { + storeoptions = append(storeoptions, so...) + } + return storeoptions +} diff --git a/ocis-pkg/registry/registry.go b/ocis-pkg/registry/registry.go index dd2245cfa..ed9ece2ae 100644 --- a/ocis-pkg/registry/registry.go +++ b/ocis-pkg/registry/registry.go @@ -13,6 +13,7 @@ import ( mdnsr "github.com/go-micro/plugins/v4/registry/mdns" memr "github.com/go-micro/plugins/v4/registry/memory" natsr "github.com/go-micro/plugins/v4/registry/nats" + "github.com/owncloud/ocis/v2/ocis-pkg/natsjsregistry" mRegistry "go-micro.dev/v4/registry" "go-micro.dev/v4/registry/cache" ) @@ -28,6 +29,7 @@ var ( reg mRegistry.Registry ) +// Configure configures the registry func Configure(plugin string) { if reg == nil { regPlugin = plugin @@ -66,6 +68,10 @@ func GetRegistry() mRegistry.Registry { ) case "memory": reg = memr.NewRegistry() + case "natsjs": + reg = natsjsregistry.NewRegistry( + mRegistry.Addrs(addresses...), + ) default: reg = mdnsr.NewRegistry() }