diff --git a/changelog/unreleased/fix-natsjskv-registry-2.md b/changelog/unreleased/fix-natsjskv-registry-2.md index 4649c8376b..d64f9f9113 100644 --- a/changelog/unreleased/fix-natsjskv-registry-2.md +++ b/changelog/unreleased/fix-natsjskv-registry-2.md @@ -1,6 +1,8 @@ Bugfix: Repair nats-js-kv registry The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it. +Internally, it can now distinguish services by version and will aggregate all nodes of the same version into a single service, as expected by the registry cache and watcher. +https://github.com/owncloud/ocis/pull/9734 https://github.com/owncloud/ocis/pull/9726 https://github.com/owncloud/ocis/pull/9656 diff --git a/ocis-pkg/natsjsregistry/registry.go b/ocis-pkg/natsjsregistry/registry.go index b66896f168..78cc3b1b54 100644 --- a/ocis-pkg/natsjsregistry/registry.go +++ b/ocis-pkg/natsjsregistry/registry.go @@ -95,7 +95,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO return err } return n.store.Write(&store.Record{ - Key: s.Name + _serviceDelimiter + server.DefaultId, + Key: s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version, Value: b, Expiry: options.TTL, }) @@ -105,7 +105,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error { n.lock.RLock() defer n.lock.RUnlock() - return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId) + return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version) } // GetService gets a specific service from the registry @@ -138,20 +138,28 @@ func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Serv return nil, err } - svcs := make([]*registry.Service, 0, len(keys)) + versions := map[string]*registry.Service{} for _, k := range keys { - s, err := n.getService(k) + s, err := n.getNode(k) if err != nil { // TODO: continue ? return nil, err } + if versions[s.Version] == nil { + versions[s.Version] = s + } else { + versions[s.Version].Nodes = append(versions[s.Version].Nodes, s.Nodes...) + } + } + svcs := make([]*registry.Service, 0, len(versions)) + for _, s := range versions { svcs = append(svcs, s) - } return svcs, nil } -func (n *storeregistry) getService(s string) (*registry.Service, error) { +// getNode retrieves a node from the store. It returns a service to also keep track of the version. +func (n *storeregistry) getNode(s string) (*registry.Service, error) { recs, err := n.store.Read(s) if err != nil { return nil, err diff --git a/ocis-pkg/natsjsregistry/watcher.go b/ocis-pkg/natsjsregistry/watcher.go index 867d726261..9376c5ae42 100644 --- a/ocis-pkg/natsjsregistry/watcher.go +++ b/ocis-pkg/natsjsregistry/watcher.go @@ -3,6 +3,7 @@ package natsjsregistry import ( "encoding/json" "errors" + "strings" natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv" "github.com/nats-io/nats.go" @@ -48,9 +49,21 @@ func (w *Watcher) Next() (*registry.Result, error) { } var svc registry.Service - if err := json.Unmarshal(kve.Value.Data, &svc); err != nil { - _ = w.stop() - return nil, err + if kve.Value.Data == nil { + // fake a service + parts := strings.SplitN(kve.Value.Key, _serviceDelimiter, 3) + if len(parts) != 3 { + return nil, errors.New("invalid service key") + } + svc.Name = parts[0] + // ocis registers nodes with a - separator + svc.Nodes = []*registry.Node{{Id: parts[0] + "-" + parts[1]}} + svc.Version = parts[2] + } else { + if err := json.Unmarshal(kve.Value.Data, &svc); err != nil { + _ = w.stop() + return nil, err + } } return ®istry.Result{ diff --git a/ocis-pkg/registry/service.go b/ocis-pkg/registry/service.go index 65c62e8d26..aebccc7520 100644 --- a/ocis-pkg/registry/service.go +++ b/ocis-pkg/registry/service.go @@ -65,6 +65,7 @@ func BuildHTTPService(serviceID, address string, version string) *mRegistry.Serv } node := &mRegistry.Node{ + // This id is read by the registry watcher Id: serviceID + "-" + server.DefaultId, Address: net.JoinHostPort(addr, fmt.Sprint(port)), Metadata: make(map[string]string),