From 6ee4a084a2862a2393f217b478a95f986909d50b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villafa=C3=B1ez?= Date: Fri, 16 Sep 2022 15:42:47 +0200 Subject: [PATCH] Use go-micro store to cache the roles (#4337) * Use go-micro store to cache the roles Add custom in-memory implementation * replace redis with custom etcd implementation * adjust table name for the cache in the roles manager * Fix tests * Fix sonarcloud issues * Refactor for sonarcloud * Allow configuration of cache per service * Reuse parent context in etcd implementation --- go.mod | 4 +- go.sum | 2 - ocis-pkg/config/config.go | 5 +- ocis-pkg/config/parser/parse.go | 13 + ocis-pkg/roles/manager.go | 50 +- ocis-pkg/roles/option.go | 30 +- ocis-pkg/shared/shared_types.go | 9 +- ocis-pkg/store/etcd/etcd.go | 531 ++++++ ocis-pkg/store/etcd/utils.go | 65 + ocis-pkg/store/memory/memstore.go | 513 ++++++ ocis-pkg/store/memory/memstore_test.go | 1506 +++++++++++++++++ ocis-pkg/store/memory/multimemstore.go | 158 ++ ocis-pkg/store/memory/multimemstore_test.go | 172 ++ ocis-pkg/store/memory/utils.go | 63 + ocis-pkg/store/store.go | 91 + services/graph/pkg/config/cachestore.go | 8 + services/graph/pkg/config/config.go | 7 +- .../pkg/config/defaults/defaultconfig.go | 10 + services/graph/pkg/service/v0/service.go | 10 +- services/ocs/pkg/config/cachestore.go | 8 + services/ocs/pkg/config/config.go | 7 +- .../ocs/pkg/config/defaults/defaultconfig.go | 10 + services/ocs/pkg/service/v0/service.go | 10 +- 23 files changed, 3238 insertions(+), 44 deletions(-) create mode 100644 ocis-pkg/store/etcd/etcd.go create mode 100644 ocis-pkg/store/etcd/utils.go create mode 100644 ocis-pkg/store/memory/memstore.go create mode 100644 ocis-pkg/store/memory/memstore_test.go create mode 100644 ocis-pkg/store/memory/multimemstore.go create mode 100644 ocis-pkg/store/memory/multimemstore_test.go create mode 100644 ocis-pkg/store/memory/utils.go create mode 100644 ocis-pkg/store/store.go create mode 100644 services/graph/pkg/config/cachestore.go create mode 100644 services/ocs/pkg/config/cachestore.go diff --git a/go.mod b/go.mod index 116b5d4bf0..605a5af9a9 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/Masterminds/semver v1.5.0 github.com/MicahParks/keyfunc v1.2.2 github.com/ReneKroon/ttlcache/v2 v2.11.0 + github.com/armon/go-radix v1.0.0 github.com/blevesearch/bleve/v2 v2.3.4 github.com/blevesearch/bleve_index_api v1.0.3 github.com/coreos/go-oidc/v3 v3.4.0 @@ -65,6 +66,7 @@ require ( github.com/xhit/go-simple-mail/v2 v2.11.0 go-micro.dev/v4 v4.8.1 go.etcd.io/bbolt v1.3.6 + go.etcd.io/etcd/client/v3 v3.5.2 go.opencensus.io v0.23.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.34.0 go.opentelemetry.io/otel v1.10.0 @@ -101,7 +103,6 @@ require ( github.com/alexedwards/argon2id v0.0.0-20211130144151-3585854a6387 // indirect github.com/amoghe/go-crypt v0.0.0-20220222110647-20eada5f5964 // indirect github.com/armon/go-metrics v0.3.10 // indirect - github.com/armon/go-radix v1.0.0 // indirect github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect github.com/aws/aws-sdk-go v1.44.94 // indirect github.com/beevik/etree v1.1.0 // indirect @@ -262,7 +263,6 @@ require ( github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.etcd.io/etcd/api/v3 v3.5.2 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.2 // indirect - go.etcd.io/etcd/client/v3 v3.5.2 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.19.1 // indirect diff --git a/go.sum b/go.sum index 01d71ed623..f0a14a3480 100644 --- a/go.sum +++ b/go.sum @@ -292,8 +292,6 @@ github.com/crewjam/saml v0.4.6 h1:XCUFPkQSJLvzyl4cW9OvpWUbRf0gE7VUpU8ZnilbeM4= github.com/crewjam/saml v0.4.6/go.mod h1:ZBOXnNPFzB3CgOkRm7Nd6IVdkG+l/wF+0ZXLqD96t1A= github.com/cs3org/go-cs3apis v0.0.0-20220818202316-e92afdddac6d h1:toyZ7IsXlUdEPZ/IG8fg7hbM8HcLPY0bkX4FKBmgLVI= github.com/cs3org/go-cs3apis v0.0.0-20220818202316-e92afdddac6d/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.10.1-0.20220915071600-3358dc72a980 h1:siIHxgMHWCxERsHPYwHL7Pno27URyqjV2np9Mh1U84g= -github.com/cs3org/reva/v2 v2.10.1-0.20220915071600-3358dc72a980/go.mod h1:+BYVpRV8g1hL8wF3+3BunL9BKPsXVyJYmH8COxq/V7Y= github.com/cs3org/reva/v2 v2.10.1-0.20220915095422-4b099c09a66c h1:pvbsnSl5WpS6PkSR4glwR8OJGrRdZASajAJtNwp9E+Y= github.com/cs3org/reva/v2 v2.10.1-0.20220915095422-4b099c09a66c/go.mod h1:+BYVpRV8g1hL8wF3+3BunL9BKPsXVyJYmH8COxq/V7Y= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= diff --git a/ocis-pkg/config/config.go b/ocis-pkg/config/config.go index 2fd90f9506..dd772c2357 100644 --- a/ocis-pkg/config/config.go +++ b/ocis-pkg/config/config.go @@ -57,8 +57,9 @@ type Runtime struct { type Config struct { *shared.Commons `yaml:"shared"` - Tracing *shared.Tracing `yaml:"tracing"` - Log *shared.Log `yaml:"log"` + Tracing *shared.Tracing `yaml:"tracing"` + Log *shared.Log `yaml:"log"` + CacheStore *shared.CacheStore `yaml:"cache_store"` Mode Mode // DEPRECATED File string diff --git a/ocis-pkg/config/parser/parse.go b/ocis-pkg/config/parser/parse.go index 81e1ede7b2..3ffe25b075 100644 --- a/ocis-pkg/config/parser/parse.go +++ b/ocis-pkg/config/parser/parse.go @@ -48,6 +48,9 @@ func EnsureDefaults(cfg *config.Config) { if cfg.TokenManager == nil { cfg.TokenManager = &shared.TokenManager{} } + if cfg.CacheStore == nil { + cfg.CacheStore = &shared.CacheStore{} + } } // EnsureCommons copies applicable parts of the oCIS config into the commons part @@ -81,6 +84,16 @@ func EnsureCommons(cfg *config.Config) { cfg.Commons.Tracing = &shared.Tracing{} } + if cfg.CacheStore != nil { + cfg.Commons.CacheStore = &shared.CacheStore{ + Type: cfg.CacheStore.Type, + Address: cfg.CacheStore.Address, + Size: cfg.CacheStore.Size, + } + } else { + cfg.Commons.CacheStore = &shared.CacheStore{} + } + // copy token manager to the commons part if set if cfg.TokenManager != nil { cfg.Commons.TokenManager = cfg.TokenManager diff --git a/ocis-pkg/roles/manager.go b/ocis-pkg/roles/manager.go index f6a59758a6..651970a925 100644 --- a/ocis-pkg/roles/manager.go +++ b/ocis-pkg/roles/manager.go @@ -2,16 +2,26 @@ package roles import ( "context" + "time" "github.com/owncloud/ocis/v2/ocis-pkg/log" + ocisstore "github.com/owncloud/ocis/v2/ocis-pkg/store" settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0" settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" + "go-micro.dev/v4/store" + "google.golang.org/protobuf/encoding/protojson" +) + +const ( + cacheDatabase = "ocis-pkg" + cacheTableName = "ocis-pkg/roles" + cacheTTL = time.Hour ) // Manager manages a cache of roles by fetching unknown roles from the settings.RoleService. type Manager struct { logger log.Logger - cache cache + cache store.Store roleService settingssvc.RoleService } @@ -19,8 +29,9 @@ type Manager struct { func NewManager(o ...Option) Manager { opts := newOptions(o...) + nStore := ocisstore.GetStore(opts.storeOptions) return Manager{ - cache: newCache(opts.size, opts.ttl), + cache: nStore, roleService: opts.roleService, } } @@ -31,10 +42,26 @@ func (m *Manager) List(ctx context.Context, roleIDs []string) []*settingsmsg.Bun result := make([]*settingsmsg.Bundle, 0) lookup := make([]string, 0) for _, roleID := range roleIDs { - if hit := m.cache.get(roleID); hit == nil { + if records, err := m.cache.Read(roleID, store.ReadFrom(cacheDatabase, cacheTableName)); err != nil { lookup = append(lookup, roleID) } else { - result = append(result, hit) + role := &settingsmsg.Bundle{} + found := false + for _, record := range records { + if record.Key == roleID { + if err := protojson.Unmarshal(record.Value, role); err == nil { + // if we can unmarshal the role, append it to the result + // otherwise assume the role wasn't found (data was damaged and + // we need to get the role again) + result = append(result, role) + found = true + break + } + } + } + if !found { + lookup = append(lookup, roleID) + } } } @@ -49,7 +76,20 @@ func (m *Manager) List(ctx context.Context, roleIDs []string) []*settingsmsg.Bun return nil } for _, role := range res.Bundles { - m.cache.set(role.Id, role) + jsonbytes, _ := protojson.Marshal(role) + record := &store.Record{ + Key: role.Id, + Value: jsonbytes, + Expiry: cacheTTL, + } + err := m.cache.Write( + record, + store.WriteTo(cacheDatabase, cacheTableName), + store.WriteTTL(cacheTTL), + ) + if err != nil { + m.logger.Debug().Err(err).Msg("failed to cache roles") + } result = append(result, role) } } diff --git a/ocis-pkg/roles/option.go b/ocis-pkg/roles/option.go index 2e8790f3fb..772f0ef647 100644 --- a/ocis-pkg/roles/option.go +++ b/ocis-pkg/roles/option.go @@ -1,37 +1,21 @@ package roles import ( - "time" - "github.com/owncloud/ocis/v2/ocis-pkg/log" + ocisstore "github.com/owncloud/ocis/v2/ocis-pkg/store" settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" ) // Options are all the possible options. type Options struct { - size int - ttl time.Duration - logger log.Logger - roleService settingssvc.RoleService + storeOptions ocisstore.OcisStoreOptions + logger log.Logger + roleService settingssvc.RoleService } // Option mutates option type Option func(*Options) -// CacheSize configures the size of the cache in items. -func CacheSize(s int) Option { - return func(o *Options) { - o.size = s - } -} - -// CacheTTL rebuilds the cache after the configured duration. -func CacheTTL(ttl time.Duration) Option { - return func(o *Options) { - o.ttl = ttl - } -} - // Logger sets a preconfigured logger func Logger(logger log.Logger) Option { return func(o *Options) { @@ -46,6 +30,12 @@ func RoleService(rs settingssvc.RoleService) Option { } } +func StoreOptions(storeOpts ocisstore.OcisStoreOptions) Option { + return func(o *Options) { + o.storeOptions = storeOpts + } +} + func newOptions(opts ...Option) Options { o := Options{} diff --git a/ocis-pkg/shared/shared_types.go b/ocis-pkg/shared/shared_types.go index d633f79bfe..5e78eaa3b6 100644 --- a/ocis-pkg/shared/shared_types.go +++ b/ocis-pkg/shared/shared_types.go @@ -34,11 +34,18 @@ type Reva struct { Address string `yaml:"address" env:"REVA_GATEWAY" desc:"The CS3 gateway endpoint."` } -// Commons holds configuration that are common to all services. Each service can then decide whether +type CacheStore struct { + Type string `yaml:"type" env:"OCIS_CACHE_STORE_TYPE" desc:"The type of the cache store. Valid options are \"noop\", \"ocmem\", \"etcd\" and \"memory\""` + Address string `yaml:"address" env:"OCIS_CACHE_STORE_ADDRESS" desc:"a comma-separated list of addresses to connect to. Only for etcd"` + Size int `yaml:"size" env:"OCIS_CACHE_STORE_SIZE" desc:"Maximum size for the cache store. Only ocmem will use this option, in number of items per table. The rest will ignore the option and can grow indefinitely"` +} + +// Commons holds configuration that are common to all extensions. Each extension can then decide whether // to overwrite its values. type Commons struct { Log *Log `yaml:"log"` Tracing *Tracing `yaml:"tracing"` + CacheStore *CacheStore `yaml:"cache_store"` OcisURL string `yaml:"ocis_url" env:"OCIS_URL" desc:"URL, where oCIS is reachable for users."` TokenManager *TokenManager `mask:"struct" yaml:"token_manager"` Reva *Reva `yaml:"reva"` diff --git a/ocis-pkg/store/etcd/etcd.go b/ocis-pkg/store/etcd/etcd.go new file mode 100644 index 0000000000..d5b7c70809 --- /dev/null +++ b/ocis-pkg/store/etcd/etcd.go @@ -0,0 +1,531 @@ +package etcd + +import ( + "context" + "encoding/json" + "strings" + "time" + + "go-micro.dev/v4/store" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/namespace" +) + +const ( + prefixNS = ".prefix" + suffixNS = ".suffix" +) + +type EtcdStore struct { + options store.Options + client *clientv3.Client +} + +// Create a new go-micro store backed by etcd +func NewEtcdStore(opts ...store.Option) store.Store { + es := &EtcdStore{} + _ = es.Init(opts...) + return es +} + +func (es *EtcdStore) getCtx() (context.Context, context.CancelFunc) { + currentCtx := es.options.Context + if currentCtx == nil { + currentCtx = context.TODO() + } + ctx, cancel := context.WithTimeout(currentCtx, 10*time.Second) + return ctx, cancel +} + +// Setup the etcd client based on the current options. The old client (if any) +// will be closed. +// Currently, only the etcd nodes are configurable. If no node is provided, +// it will use the "127.0.0.1:2379" node. +// Context timeout is setup to 10 seconds, and dial timeout to 2 seconds +func (es *EtcdStore) setupClient() { + if es.client != nil { + es.client.Close() + } + + endpoints := []string{"127.0.0.1:2379"} + if len(es.options.Nodes) > 0 { + endpoints = es.options.Nodes + } + + cli, _ := clientv3.New(clientv3.Config{ + DialTimeout: 2 * time.Second, + Endpoints: endpoints, + }) + + es.client = cli +} + +// Initialize the go-micro store implementation. +// Currently, only the nodes are configurable, the rest of the options +// will be ignored. +func (es *EtcdStore) Init(opts ...store.Option) error { + optList := store.Options{} + for _, opt := range opts { + opt(&optList) + } + + es.options = optList + es.setupClient() + return nil +} + +// Get the store options +func (es *EtcdStore) Options() store.Options { + return es.options +} + +// Get the effective TTL, as int64 number of seconds. It will prioritize +// the TTL set in the options, then the expiry time in the options, and +// finally the one set as part of the record +func getEffectiveTTL(r *store.Record, opts store.WriteOptions) int64 { + // set base ttl duration and expiration time based on the record + duration := r.Expiry + + // overwrite ttl duration and expiration time based on options + if !opts.Expiry.IsZero() { + // options.Expiry is a time.Time, newRecord.Expiry is a time.Duration + duration = time.Until(opts.Expiry) + } + + // TTL option takes precedence over expiration time + if opts.TTL != 0 { + duration = opts.TTL + } + + // use milliseconds because it returns an int64 instead of a float64 + return duration.Milliseconds() / 1000 +} + +// Write the record into the etcd. The record will be duplicated in order to +// find it by prefix or by suffix. This means that it will take double space. +// Note that this is an implementation detail and it will be handled +// transparently. +// +// Database and Table options will be used to provide a different prefix to +// the key. Each service using this store should use a different database+table +// combination in order to prevent key collisions. +// +// Due to how TTLs are implemented in etcd, the minimum valid TTL seems to +// be 2 secs. Using lower values or even negative values will force the etcd +// server to use the minimum value instead. +// In addition, getting a lease for the TTL and attach it to the target key +// are 2 different operations that can't be sent as part of a transaction. +// This means that it's possible to get a lease and have that lease expire +// before attaching it to the key. Errors are expected to happen if this is +// the case, and no key will be inserted. +// According to etcd documentation, the key is guaranteed to be available +// AT LEAST the TTL duration. This means that the key might be available for +// a longer period of time in special circumstances. +// +// It's recommended to use a minimum TTL of 10 secs or higher (or not to use +// TTL) in order to prevent problematic scenarios. +func (es *EtcdStore) Write(r *store.Record, opts ...store.WriteOption) error { + wopts := store.WriteOptions{} + for _, opt := range opts { + opt(&wopts) + } + + prefix := buildPrefix(wopts.Database, wopts.Table, prefixNS) + suffix := buildPrefix(wopts.Database, wopts.Table, suffixNS) + + kv := es.client.KV + + jsonRecord, err := json.Marshal(r) + if err != nil { + return err + } + jsonStringRecord := string(jsonRecord) + + effectiveTTL := getEffectiveTTL(r, wopts) + var opOpts []clientv3.OpOption + + if effectiveTTL != 0 { + lease := es.client.Lease + ctx, cancel := es.getCtx() + gResp, gErr := lease.Grant(ctx, getEffectiveTTL(r, wopts)) + cancel() + if gErr != nil { + return gErr + } + opOpts = []clientv3.OpOption{clientv3.WithLease(gResp.ID)} + } else { + opOpts = []clientv3.OpOption{clientv3.WithLease(0)} + } + + ctx, cancel := es.getCtx() + _, err = kv.Txn(ctx).Then( + clientv3.OpPut(prefix+r.Key, jsonStringRecord, opOpts...), + clientv3.OpPut(suffix+reverseString(r.Key), jsonStringRecord, opOpts...), + ).Commit() + cancel() + + return err +} + +// Process a Get response taking into account the provided offset +func processGetResponse(resp *clientv3.GetResponse, offset int64) ([]*store.Record, error) { + result := make([]*store.Record, 0, len(resp.Kvs)) + for index, kvs := range resp.Kvs { + if int64(index) < offset { + // skip entries before the offset + continue + } + + value := &store.Record{} + err := json.Unmarshal(kvs.Value, value) + if err != nil { + return nil, err + } + result = append(result, value) + } + return result, nil +} + +// Process a List response taking into account the provided offset. +// The reverse flag will be used to reverse the keys found. For example, +// "zyxw" will be reversed to "wxyz". This is used for suffix searches, +// where the keys are stored reversed and need to be changed +func processListResponse(resp *clientv3.GetResponse, offset int64, reverse bool) ([]string, error) { + result := make([]string, 0, len(resp.Kvs)) + for index, kvs := range resp.Kvs { + if int64(index) < offset { + // skip entries before the offset + continue + } + + targetKey := string(kvs.Key) + if reverse { + targetKey = reverseString(targetKey) + } + result = append(result, targetKey) + } + return result, nil +} + +// Perform an exact key read and return the result +func (es *EtcdStore) directRead(kv clientv3.KV, key string) ([]*store.Record, error) { + ctx, cancel := es.getCtx() + resp, err := kv.Get(ctx, key) + cancel() + if err != nil { + return nil, err + } + + if len(resp.Kvs) == 0 { + return nil, store.ErrNotFound + } + + return processGetResponse(resp, 0) +} + +// Perform a prefix read with limit and offset. A limit of 0 will return all +// results. Usage of offset isn't recommended because those results must still +// be fethed from the server in order to be discarded. +func (es *EtcdStore) prefixRead(kv clientv3.KV, key string, limit, offset int64) ([]*store.Record, error) { + getOptions := []clientv3.OpOption{ + clientv3.WithPrefix(), + } + if limit > 0 { + getOptions = append(getOptions, clientv3.WithLimit(limit+offset)) + } + + ctx, cancel := es.getCtx() + resp, err := kv.Get(ctx, key, getOptions...) + cancel() + if err != nil { + return nil, err + } + return processGetResponse(resp, offset) +} + +// Perform a prefix + suffix read with limit and offset. A limit of 0 will +// return all results found. Usage of this function is discouraged because +// we'll have to request a prefix search and match the suffix manually. This +// means that even with a limit = 3 and offset = 0, there is no guarantee +// we'll find all the results we need within that range, and we'll likely +// need to request more data from the server. The number of requests we need +// to perform is unknown and might cause load. +func (es *EtcdStore) prefixSuffixRead(kv clientv3.KV, prefix, suffix string, limit, offset int64) ([]*store.Record, error) { + firstKeyOut := firstKeyOutOfPrefixString(prefix) + getOptions := []clientv3.OpOption{ + clientv3.WithRange(firstKeyOut), + } + + if limit > 0 { + // unlikely to find all the entries we need within offset + limit + getOptions = append(getOptions, clientv3.WithLimit((limit+offset)*2)) + } + + var currentRecordOffset int64 + result := []*store.Record{} + initialKey := prefix + + keepGoing := true + for keepGoing { + ctx, cancel := es.getCtx() + resp, respErr := kv.Get(ctx, initialKey, getOptions...) + cancel() + if respErr != nil { + return nil, respErr + } + + records, err := processGetResponse(resp, 0) + if err != nil { + return nil, err + } + for _, record := range records { + if !strings.HasSuffix(record.Key, suffix) { + continue + } + + if currentRecordOffset < offset { + currentRecordOffset++ + continue + } + + if !shouldFinish(int64(len(result)), limit) { + result = append(result, record) + if shouldFinish(int64(len(result)), limit) { + break + } + } + } + if !resp.More || shouldFinish(int64(len(result)), limit) { + keepGoing = false + } else { + initialKey = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) // append byte 0 (nul char) to the last key + } + } + return result, nil +} + +// Read records from the etcd server based in the key. Database and Table +// options are highly recommended, otherwise we'll use a default one (which +// might not have the requested keys) +// +// If no prefix or suffix option is provided, we'll read the record matching +// the provided key. Note that a list of records will be provided anyway, +// likely with only one record (the one requested) +// +// Prefix and suffix options are supported and should perform fine even with +// a large amount of data. Note that the limit option should also be included +// in order to limit the amount of records we need to fetch. +// +// Note that using both prefix and suffix options at the same time is possible +// but discouraged. A prefix search will be send to the etcd server, and from +// there we'll manually pick the records matching the suffix. This might become +// very inefficient since we might need to request more data to the etcd +// multiple times in order to provide the results asked. +// Usage of the offset option is also discouraged because we'll have to request +// records that we'll have to skip manually on our side. +// +// Don't rely on any particular order of the keys. The records are expected to +// be sorted by key except if the suffix option (suffix without prefix) is +// used. In this case, the keys will be sorted based on the reversed key +func (es *EtcdStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + ropts := store.ReadOptions{} + for _, opt := range opts { + opt(&ropts) + } + + prefix := buildPrefix(ropts.Database, ropts.Table, prefixNS) + suffix := buildPrefix(ropts.Database, ropts.Table, suffixNS) + + kv := es.client.KV + preKv := namespace.NewKV(kv, prefix) + sufKv := namespace.NewKV(kv, suffix) + + if ropts.Prefix && ropts.Suffix { + return es.prefixSuffixRead(preKv, key, key, int64(ropts.Limit), int64(ropts.Offset)) + } + + if ropts.Prefix { + return es.prefixRead(preKv, key, int64(ropts.Limit), int64(ropts.Offset)) + } + + if ropts.Suffix { + return es.prefixRead(sufKv, reverseString(key), int64(ropts.Limit), int64(ropts.Offset)) + } + + return es.directRead(preKv, key) +} + +// Delete the record containing the key provided. Database and Table +// options are highly recommended, otherwise we'll use a default one (which +// might not have the requested keys) +// +// Since the Write method inserts 2 entries for a given key, those both +// entries will also be removed using the same key. This is handled +// transparently. +func (es *EtcdStore) Delete(key string, opts ...store.DeleteOption) error { + dopts := store.DeleteOptions{} + for _, opt := range opts { + opt(&dopts) + } + + prefix := buildPrefix(dopts.Database, dopts.Table, prefixNS) + suffix := buildPrefix(dopts.Database, dopts.Table, suffixNS) + + kv := es.client.KV + + ctx, cancel := es.getCtx() + _, err := kv.Txn(ctx).Then( + clientv3.OpDelete(prefix+key), + clientv3.OpDelete(suffix+reverseString(key)), + ).Commit() + cancel() + + return err +} + +// List the keys based on the provided prefix. Use the empty string (and no +// limit nor offset) to list all keys available. +// Limit and offset options are available to limit the keys we need to return. +// The reverse option will reverse the keys before returning them. Use it when +// listing the keys from the suffix KV. +// +// Note that values for the keys won't be requested to the etcd server, that's +// why the reverse option is important +func (es *EtcdStore) listKeys(kv clientv3.KV, prefixKey string, limit, offset int64, reverse bool) ([]string, error) { + getOptions := []clientv3.OpOption{ + clientv3.WithKeysOnly(), + clientv3.WithPrefix(), + } + if limit > 0 { + getOptions = append(getOptions, clientv3.WithLimit(limit+offset)) + } + + ctx, cancel := es.getCtx() + resp, err := kv.Get(ctx, prefixKey, getOptions...) + cancel() + if err != nil { + return nil, err + } + + return processListResponse(resp, offset, reverse) +} + +// List the keys matching both prefix and suffix, with the provided limit and +// offset. Usage of this function is discouraged because we'll have to match +// the suffix manually on our side, which means we'll likely need to perform +// additional requests to the etcd server to get more results matching all the +// requirements. +func (es *EtcdStore) prefixSuffixList(kv clientv3.KV, prefix, suffix string, limit, offset int64) ([]string, error) { + firstKeyOut := firstKeyOutOfPrefixString(prefix) + getOptions := []clientv3.OpOption{ + clientv3.WithKeysOnly(), + clientv3.WithRange(firstKeyOut), + } + if firstKeyOut == "" { + // could happen of all bytes are "\xff" + getOptions = getOptions[:1] // remove the WithRange option + } + + if limit > 0 { + // unlikely to find all the entries we need within offset + limit + getOptions = append(getOptions, clientv3.WithLimit((limit+offset)*2)) + } + + var currentRecordOffset int64 + result := []string{} + initialKey := prefix + + keepGoing := true + for keepGoing { + ctx, cancel := es.getCtx() + resp, respErr := kv.Get(ctx, initialKey, getOptions...) + cancel() + if respErr != nil { + return nil, respErr + } + + keys, err := processListResponse(resp, 0, false) + if err != nil { + return nil, err + } + for _, key := range keys { + if !strings.HasSuffix(key, suffix) { + continue + } + + if currentRecordOffset < offset { + currentRecordOffset++ + continue + } + + if !shouldFinish(int64(len(result)), limit) { + result = append(result, key) + if shouldFinish(int64(len(result)), limit) { + break + } + } + } + if !resp.More || shouldFinish(int64(len(result)), limit) { + keepGoing = false + } else { + initialKey = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) // append byte 0 (nul char) to the last key + } + } + return result, nil +} + +// List the keys available in the etcd server. Database and Table +// options are highly recommended, otherwise we'll use a default one (which +// might not have the requested keys) +// +// With the Database and Table options, all the keys returned will be within +// that database and table. Each service is expected to use a different +// database + table, so using those options will list only the keys used by +// that particular service. +// +// Prefix and suffix options are available along with the limit and offset +// ones. +// +// Using prefix and suffix options at the same time is discourage because +// the suffix matching will be done on our side, and we'll likely need to +// perform multiple requests to get the requested results. Note that using +// just the suffix option is fine. +// In addition, using the offset option is also discouraged because we'll +// need to request additional keys that will be skipped on our side. +func (es *EtcdStore) List(opts ...store.ListOption) ([]string, error) { + lopts := store.ListOptions{} + for _, opt := range opts { + opt(&lopts) + } + + prefix := buildPrefix(lopts.Database, lopts.Table, prefixNS) + suffix := buildPrefix(lopts.Database, lopts.Table, suffixNS) + + kv := es.client.KV + preKv := namespace.NewKV(kv, prefix) + sufKv := namespace.NewKV(kv, suffix) + + if lopts.Prefix != "" && lopts.Suffix != "" { + return es.prefixSuffixList(preKv, lopts.Prefix, lopts.Suffix, int64(lopts.Limit), int64(lopts.Offset)) + } + + if lopts.Prefix != "" { + return es.listKeys(preKv, lopts.Prefix, int64(lopts.Limit), int64(lopts.Offset), false) + } + + if lopts.Suffix != "" { + return es.listKeys(sufKv, reverseString(lopts.Suffix), int64(lopts.Limit), int64(lopts.Offset), true) + } + + return es.listKeys(preKv, "", int64(lopts.Limit), int64(lopts.Offset), false) +} + +// Close the client +func (es *EtcdStore) Close() error { + return es.client.Close() +} + +// Return the service name +func (es *EtcdStore) String() string { + return "Etcd" +} diff --git a/ocis-pkg/store/etcd/utils.go b/ocis-pkg/store/etcd/utils.go new file mode 100644 index 0000000000..c88c425015 --- /dev/null +++ b/ocis-pkg/store/etcd/utils.go @@ -0,0 +1,65 @@ +package etcd + +import ( + "strings" +) + +// Returns true if the limit isn't 0 AND is greater or equal to the number +// of results. +// If the limit is 0 or the number of items is less than the number of items, +// it will return false +func shouldFinish(numberOfResults, limit int64) bool { + if limit == 0 || numberOfResults < limit { + return false + } + return true +} + +// Return the first key out of the prefix represented by the parameter, +// as a byte sequence. Note that it applies to byte sequences and not +// rune sequences, so it might be ill-suited for multi-byte chars +func firstKeyOutOfPrefix(src []byte) []byte { + dst := make([]byte, len(src)) + copy(dst, src) + var i int + for i = len(dst) - 1; i >= 0; i-- { + if dst[i] < 255 { + dst[i]++ + break + } + } + return dst[:i+1] +} + +// Return the first key out of the prefix represented by the parameter. +// This function relies on the firstKeyOutOfPrefix one, which uses a byte +// sequence, so it might be ill-suited if the string contains multi-byte chars. +func firstKeyOutOfPrefixString(src string) string { + srcBytes := []byte(src) + dstBytes := firstKeyOutOfPrefix(srcBytes) + return string(dstBytes) +} + +// Reverse the string based on the containing runes +func reverseString(s string) string { + r := []rune(s) + for i, j := 0, len(r)-1; i < len(r)/2; i, j = i+1, j-1 { + r[i], r[j] = r[j], r[i] + } + return string(r) +} + +// Build a string based on the parts, to be used as a prefix. Empty string is +// expected if no part is passed as parameter. +// The string will contain all the parts separated by '/'. The last char will +// also be '/' +// +// For example `buildPrefix(P1, P2, P3)` will return "P1/P2/P3/" +func buildPrefix(parts ...string) string { + var b strings.Builder + for _, part := range parts { + b.WriteString(part) + b.WriteRune('/') + } + return b.String() +} diff --git a/ocis-pkg/store/memory/memstore.go b/ocis-pkg/store/memory/memstore.go new file mode 100644 index 0000000000..bf8e1d6b95 --- /dev/null +++ b/ocis-pkg/store/memory/memstore.go @@ -0,0 +1,513 @@ +package memory + +import ( + "container/list" + "context" + "strings" + "sync" + "time" + + "github.com/armon/go-radix" + "go-micro.dev/v4/store" +) + +// In-memory store implementation using radix tree for fast prefix and suffix +// searches. +// Insertions are expected to be a bit slow due to the data structures, but +// searches are expected to be fast, including exact key search, as well as +// prefix and suffix searches (based on the number of elements to be returned). +// Prefix+suffix search isn't optimized and will depend on how many items we +// need to skip. +// It's also recommended to use reasonable limits when using prefix or suffix +// searches because we'll need to traverse the data structures to provide the +// results. The traversal will stop a soon as we have the required number of +// results, so it will be faster if we use a short limit. +// +// The overall performance will depend on how the radix trees are built. +// The number of elements won't directly affect the performance but how the +// keys are dispersed. The more dispersed the keys are, the faster the search +// will be, regardless of the number of keys. This happens due to the number +// of hops we need to do to reach the target element. +// This also mean that if the keys are too similar, the performance might be +// slower than expected even if the number of elements isn't too big. +type MemStore struct { + preRadix *radix.Tree + sufRadix *radix.Tree + evictionList *list.List + + options store.Options + + lockGlob sync.RWMutex + lockEvicList sync.RWMutex // Read operation will modify the eviction list +} + +type storeRecord struct { + Key string + Value []byte + Metadata map[string]interface{} + Expiry time.Duration + ExpiresAt time.Time +} + +type contextKey string + +var targetContextKey contextKey + +// Prepare a context to be used with the memory implementation. The context +// is used to set up custom parameters to the specific implementation. +// In this case, you can configure the maximum capacity for the MemStore +// implementation as shown below. +// ``` +// cache := NewMemStore( +// store.WithContext( +// NewContext( +// ctx, +// map[string]interface{}{ +// "maxCap": 50, +// }, +// ), +// ), +// ) +// ``` +// +// Available options for the MemStore are: +// * "maxCap" -> 512 (int) The maximum number of elements the cache will hold. +// Adding additional elements will remove old elements to ensure we aren't over +// the maximum capacity. +// +// For convenience, this can also be used for the MultiMemStore. +func NewContext(ctx context.Context, storeParams map[string]interface{}) context.Context { + return context.WithValue(ctx, targetContextKey, storeParams) +} + +// Create a new MemStore instance +func NewMemStore(opts ...store.Option) store.Store { + m := &MemStore{} + _ = m.Init(opts...) + return m +} + +// Get the maximum capacity configured. If no maxCap has been configured +// (via `NewContext`), 512 will be used as maxCap. +func (m *MemStore) getMaxCap() int { + maxCap := 512 + + ctx := m.options.Context + if ctx == nil { + return maxCap + } + + ctxValue := ctx.Value(targetContextKey) + if ctxValue == nil { + return maxCap + } + additionalOpts := ctxValue.(map[string]interface{}) + + confCap, exists := additionalOpts["maxCap"] + if exists { + maxCap = confCap.(int) + } + return maxCap +} + +// Initialize the MemStore. If the MemStore was used, this will reset +// all the internal structures and the new options (passed as parameters) +// will be used. +func (m *MemStore) Init(opts ...store.Option) error { + optList := store.Options{} + for _, opt := range opts { + opt(&optList) + } + + m.lockGlob.Lock() + defer m.lockGlob.Unlock() + + m.preRadix = radix.New() + m.sufRadix = radix.New() + m.evictionList = list.New() + m.options = optList + + return nil +} + +// Get the options being used +func (m *MemStore) Options() store.Options { + m.lockGlob.RLock() + defer m.lockGlob.RUnlock() + + return m.options +} + +// Write the record in the MemStore. +// Note that Database and Table options will be ignored. +// Expiration options will take the following precedence: +// TTL option > expiration option > TTL record +// +// New elements will take the last position in the eviction list. Updating +// an element will also move the element to the last position. +// +// Although not recommended, new elements might be inserted with an +// already-expired date +func (m *MemStore) Write(r *store.Record, opts ...store.WriteOption) error { + var element *list.Element + + wopts := store.WriteOptions{} + for _, opt := range opts { + opt(&wopts) + } + cRecord := toStoreRecord(r, wopts) + + m.lockGlob.Lock() + defer m.lockGlob.Unlock() + + ele, exists := m.preRadix.Get(cRecord.Key) + if exists { + element = ele.(*list.Element) + element.Value = cRecord + + m.evictionList.MoveToBack(element) + } else { + if m.evictionList.Len() >= m.getMaxCap() { + elementToDelete := m.evictionList.Front() + if elementToDelete != nil { + recordToDelete := elementToDelete.Value.(*storeRecord) + _, _ = m.preRadix.Delete(recordToDelete.Key) + _, _ = m.sufRadix.Delete(recordToDelete.Key) + m.evictionList.Remove(elementToDelete) + } + } + element = m.evictionList.PushBack(cRecord) + _, _ = m.preRadix.Insert(cRecord.Key, element) + _, _ = m.sufRadix.Insert(reverseString(cRecord.Key), element) + } + return nil +} + +// Read the key from the MemStore. A list of records will be returned even if +// you're asking for the exact key (only one record is expected in that case). +// +// Reading the exact element will move such element to the last position of +// the eviction list. This WON'T apply for prefix and / or suffix reads. +// +// This method guarantees that no expired element will be returned. For the +// case of exact read, the element will be removed and a "not found" error +// will be returned. +// For prefix and suffix reads, all the elements that we traverse through +// will be removed. This includes the elements we need to skip as well as +// the elements that might have gotten into the the result. Note that the +// elements that are over the limit won't be touched +// +// All read options are supported except Database and Table. +// +// For prefix and prefix+suffix options, the records will be returned in +// alphabetical order on the keys. +// For the suffix option (just suffix, no prefix), the records will be +// returned in alphabetical order after reversing the keys. This means, +// reverse all the keys and then sort them alphabetically. This just affects +// the sorting order; the keys will be returned as expected. +// This means that ["aboz", "caaz", "ziuz"] will be sorted as ["caaz", "aboz", "ziuz"] +// for the key "z" as suffix. +// +// Note that offset are supported but not recommended. There is no direct access +// to the record X. We'd need to skip all the records until we reach the specified +// offset, which could be problematic. +// Performance for prefix and suffix searches should be good assuming we limit +// the number of results we need to return. +func (m *MemStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + var element *list.Element + + ropts := store.ReadOptions{} + for _, opt := range opts { + opt(&ropts) + } + + if !ropts.Prefix && !ropts.Suffix { + m.lockGlob.RLock() + ele, exists := m.preRadix.Get(key) + if !exists { + m.lockGlob.RUnlock() + return nil, store.ErrNotFound + } + + element = ele.(*list.Element) + record := element.Value.(*storeRecord) + if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) { + // record expired -> need to delete + m.lockGlob.RUnlock() + m.lockGlob.Lock() + defer m.lockGlob.Unlock() + + m.evictionList.Remove(element) + _, _ = m.preRadix.Delete(key) + _, _ = m.sufRadix.Delete(reverseString(key)) + return nil, store.ErrNotFound + } + + m.lockEvicList.Lock() + m.evictionList.MoveToBack(element) + m.lockEvicList.Unlock() + + foundRecords := []*store.Record{ + fromStoreRecord(record), + } + m.lockGlob.RUnlock() + + return foundRecords, nil + } + + records := []*store.Record{} + expiredElements := make(map[string]*list.Element) + + m.lockGlob.RLock() + if ropts.Prefix && ropts.Suffix { + // if we need to check both prefix and suffix, go through the + // prefix tree and skip elements without the right suffix. We + // don't need to check the suffix tree because the elements + // must be in both trees + m.preRadix.WalkPrefix(key, m.radixTreeCallBackCheckSuffix(ropts.Offset, ropts.Limit, key, &records, expiredElements)) + } else { + if ropts.Prefix { + m.preRadix.WalkPrefix(key, m.radixTreeCallBack(ropts.Offset, ropts.Limit, &records, expiredElements)) + } + if ropts.Suffix { + m.sufRadix.WalkPrefix(reverseString(key), m.radixTreeCallBack(ropts.Offset, ropts.Limit, &records, expiredElements)) + } + } + m.lockGlob.RUnlock() + + // if there are expired elements, get a write lock and delete the expired elements + if len(expiredElements) > 0 { + m.lockGlob.Lock() + for key, element := range expiredElements { + m.evictionList.Remove(element) + _, _ = m.preRadix.Delete(key) + _, _ = m.sufRadix.Delete(reverseString(key)) + } + m.lockGlob.Unlock() + } + return records, nil +} + +// Remove the record based on the key. It won't return any error if it's missing +// +// Database and Table options aren't supported +func (m *MemStore) Delete(key string, opts ...store.DeleteOption) error { + m.lockGlob.Lock() + defer m.lockGlob.Unlock() + + ele, exists := m.preRadix.Get(key) + if exists { + element := ele.(*list.Element) + m.evictionList.Remove(element) + _, _ = m.preRadix.Delete(key) + _, _ = m.sufRadix.Delete(reverseString(key)) + } + return nil +} + +// List the keys currently used in the MemStore +// +// All options are supported except Database and Table +// +// For prefix and prefix+suffix options, the keys will be returned in +// alphabetical order. +// For the suffix option (just suffix, no prefix), the keys will be +// returned in alphabetical order after reversing the keys. This means, +// reverse all the keys and then sort them alphabetically. This just affects +// the sorting order; the keys will be returned as expected. +// This means that ["aboz", "caaz", "ziuz"] will be sorted as ["caaz", "aboz", "ziuz"] +func (m *MemStore) List(opts ...store.ListOption) ([]string, error) { + records := []string{} + expiredElements := make(map[string]*list.Element) + + lopts := store.ListOptions{} + for _, opt := range opts { + opt(&lopts) + } + + if lopts.Prefix == "" && lopts.Suffix == "" { + m.lockGlob.RLock() + m.preRadix.Walk(m.radixTreeCallBackKeysOnly(lopts.Offset, lopts.Limit, &records, expiredElements)) + m.lockGlob.RUnlock() + + // if there are expired elements, get a write lock and delete the expired elements + if len(expiredElements) > 0 { + m.lockGlob.Lock() + for key, element := range expiredElements { + m.evictionList.Remove(element) + _, _ = m.preRadix.Delete(key) + _, _ = m.sufRadix.Delete(reverseString(key)) + } + m.lockGlob.Unlock() + } + return records, nil + } + + m.lockGlob.RLock() + if lopts.Prefix != "" && lopts.Suffix != "" { + // if we need to check both prefix and suffix, go through the + // prefix tree and skip elements without the right suffix. We + // don't need to check the suffix tree because the elements + // must be in both trees + m.preRadix.WalkPrefix(lopts.Prefix, m.radixTreeCallBackKeysOnlyWithSuffix(lopts.Offset, lopts.Limit, lopts.Suffix, &records, expiredElements)) + } else { + if lopts.Prefix != "" { + m.preRadix.WalkPrefix(lopts.Prefix, m.radixTreeCallBackKeysOnly(lopts.Offset, lopts.Limit, &records, expiredElements)) + } + if lopts.Suffix != "" { + m.sufRadix.WalkPrefix(reverseString(lopts.Suffix), m.radixTreeCallBackKeysOnly(lopts.Offset, lopts.Limit, &records, expiredElements)) + } + } + m.lockGlob.RUnlock() + + // if there are expired elements, get a write lock and delete the expired elements + if len(expiredElements) > 0 { + m.lockGlob.Lock() + for key, element := range expiredElements { + m.evictionList.Remove(element) + _, _ = m.preRadix.Delete(key) + _, _ = m.sufRadix.Delete(reverseString(key)) + } + m.lockGlob.Unlock() + } + return records, nil +} + +func (m *MemStore) Close() error { + return nil +} + +func (m *MemStore) String() string { + return "RadixMemStore" +} + +func (m *MemStore) Len() (int, bool) { + eLen := m.evictionList.Len() + pLen := m.preRadix.Len() + sLen := m.sufRadix.Len() + if eLen == pLen && eLen == sLen { + return eLen, true + } + return 0, false +} + +func (m *MemStore) radixTreeCallBack(offset, limit uint, result *[]*store.Record, expiredElements map[string]*list.Element) radix.WalkFn { + currentIndex := new(uint) // needs to be a pointer so the value persist across callback calls + maxIndex := new(uint) // needs to be a pointer so the value persist across callback calls + *maxIndex = offset + limit + return func(key string, value interface{}) bool { + element := value.(*list.Element) + record := element.Value.(*storeRecord) + + if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) { + // record has expired -> add element to the expiredElements map + // and jump directly to the next element without increasing the index + expiredElements[record.Key] = element + return false + } + + if *currentIndex >= offset && (*currentIndex < *maxIndex || *maxIndex == offset) { + // if it's within expected range, add a copy to the results + *result = append(*result, fromStoreRecord(record)) + } + + *currentIndex++ + + if *currentIndex < *maxIndex || *maxIndex == offset { + return false + } + return true + } +} + +func (m *MemStore) radixTreeCallBackCheckSuffix(offset, limit uint, presuf string, result *[]*store.Record, expiredElements map[string]*list.Element) radix.WalkFn { + currentIndex := new(uint) // needs to be a pointer so the value persist across callback calls + maxIndex := new(uint) // needs to be a pointer so the value persist across callback calls + *maxIndex = offset + limit + return func(key string, value interface{}) bool { + if !strings.HasSuffix(key, presuf) { + return false + } + + element := value.(*list.Element) + record := element.Value.(*storeRecord) + + if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) { + // record has expired -> add element to the expiredElements map + // and jump directly to the next element without increasing the index + expiredElements[record.Key] = element + return false + } + + if *currentIndex >= offset && (*currentIndex < *maxIndex || *maxIndex == offset) { + *result = append(*result, fromStoreRecord(record)) + } + + *currentIndex++ + + if *currentIndex < *maxIndex || *maxIndex == offset { + return false + } + return true + } +} + +func (m *MemStore) radixTreeCallBackKeysOnly(offset, limit uint, result *[]string, expiredElements map[string]*list.Element) radix.WalkFn { + currentIndex := new(uint) // needs to be a pointer so the value persist across callback calls + maxIndex := new(uint) // needs to be a pointer so the value persist across callback calls + *maxIndex = offset + limit + return func(key string, value interface{}) bool { + element := value.(*list.Element) + record := element.Value.(*storeRecord) + + if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) { + // record has expired -> add element to the expiredElements map + // and jump directly to the next element without increasing the index + expiredElements[record.Key] = element + return false + } + + if *currentIndex >= offset && (*currentIndex < *maxIndex || *maxIndex == offset) { + *result = append(*result, record.Key) + } + + *currentIndex++ + + if *currentIndex < *maxIndex || *maxIndex == offset { + return false + } + return true + } +} + +func (m *MemStore) radixTreeCallBackKeysOnlyWithSuffix(offset, limit uint, presuf string, result *[]string, expiredElements map[string]*list.Element) radix.WalkFn { + currentIndex := new(uint) // needs to be a pointer so the value persist across callback calls + maxIndex := new(uint) // needs to be a pointer so the value persist across callback calls + *maxIndex = offset + limit + return func(key string, value interface{}) bool { + if !strings.HasSuffix(key, presuf) { + return false + } + + element := value.(*list.Element) + record := element.Value.(*storeRecord) + + if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) { + // record has expired -> add element to the expiredElements map + // and jump directly to the next element without increasing the index + expiredElements[record.Key] = element + return false + } + + if *currentIndex >= offset && (*currentIndex < *maxIndex || *maxIndex == offset) { + *result = append(*result, record.Key) + } + + *currentIndex++ + + if *currentIndex < *maxIndex || *maxIndex == offset { + return false + } + return true + } +} diff --git a/ocis-pkg/store/memory/memstore_test.go b/ocis-pkg/store/memory/memstore_test.go new file mode 100644 index 0000000000..28fc5ca47f --- /dev/null +++ b/ocis-pkg/store/memory/memstore_test.go @@ -0,0 +1,1506 @@ +package memory + +import ( + "context" + "encoding/hex" + "hash/fnv" + "math/rand" + "sort" + "strconv" + "strings" + "sync" + "testing" + "time" + + "sync/atomic" + + "go-micro.dev/v4/store" +) + +func TestWriteAndRead(t *testing.T) { + cache := NewMemStore() + data := map[string]string{ + "abaya": "v329487", + "abaaz": "v398342", + "abayakjdkj": "v989898", + "zzzz": "viaooouyenbdnya", + "abazzz": "v57869nbdnya", + "mbmbmb": "viuyenbdnya", + "zozzz": "vooouyenbdnya", + "zzaz": "viaooouyenbdnya", + "mbzzaamb": "viunya", + } + + for key, value := range data { + record := &store.Record{ + Key: key, + Value: []byte(value), + } + _ = cache.Write(record) + } + + t.Run("Plain", func(t *testing.T) { + readPlain(t, cache) + }) + t.Run("Prefix", func(t *testing.T) { + readPrefix(t, cache) + }) + t.Run("Suffix", func(t *testing.T) { + readSuffix(t, cache) + }) + t.Run("PrefixSuffix", func(t *testing.T) { + readPrefixSuffix(t, cache) + }) + t.Run("PrefixLimitOffset", func(t *testing.T) { + readPrefixLimitOffset(t, cache) + }) + t.Run("SuffixLimitOffset", func(t *testing.T) { + readSuffixLimitOffset(t, cache) + }) + t.Run("PrefixSuffixLimitOffset", func(t *testing.T) { + readPrefixSuffixLimitOffset(t, cache) + }) +} + +func readPlain(t *testing.T, cache store.Store) { + // expected data in the cache + data := map[string]string{ + "abaya": "v329487", + "abaaz": "v398342", + "abayakjdkj": "v989898", + "zzzz": "viaooouyenbdnya", + "abazzz": "v57869nbdnya", + "mbmbmb": "viuyenbdnya", + "zozzz": "vooouyenbdnya", + "zzaz": "viaooouyenbdnya", + "mbzzaamb": "viunya", + } + for key, value := range data { + records, _ := cache.Read(key) + if len(records) != 1 { + t.Fatalf("Plain read for key %s returned %d records", key, len(records)) + } + if key != records[0].Key { + t.Errorf("Plain read for key %s returned got wrong key %s", key, records[0].Key) + } + v := string(records[0].Value) + if value != v { + t.Errorf("Plain read for key %s returned different value, expected %s, got %s", key, value, v) + } + } +} + +func readPrefix(t *testing.T, cache store.Store) { + pref1 := []struct { + Key string + Value string + }{ + {Key: "abaya", Value: "v329487"}, + {Key: "abayakjdkj", Value: "v989898"}, + } + + pref2 := []struct { + Key string + Value string + }{ + {Key: "zozzz", Value: "vooouyenbdnya"}, + {Key: "zzaz", Value: "viaooouyenbdnya"}, + {Key: "zzzz", Value: "viaooouyenbdnya"}, + } + + records, _ := cache.Read("abaya", store.ReadPrefix()) + if len(records) != 2 { + t.Fatalf("Prefix read for \"abaya\" returned %d records, expected 2", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref1[index].Key != record.Key { + t.Errorf("Unexpected key for prefix \"abaya\", index %d, expected %s, got %s", index, pref1[index].Key, record.Key) + } + if pref1[index].Value != string(record.Value) { + t.Errorf("Unexpected value for prefix \"abaya\", index %d, expected %s, got %s", index, pref1[index].Value, record.Value) + } + } + + records, _ = cache.Read("z", store.ReadPrefix()) + if len(records) != 3 { + t.Fatalf("Prefix read for \"z\" returned %d records, expected 3", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref2[index].Key != record.Key { + t.Errorf("Unexpected key for prefix \"z\", index %d, expected %s, got %s", index, pref2[index].Key, record.Key) + } + if pref2[index].Value != string(record.Value) { + t.Errorf("Unexpected value for prefix \"z\", index %d, expected %s, got %s", index, pref2[index].Value, record.Value) + } + } +} + +func readSuffix(t *testing.T, cache store.Store) { + pref1 := []struct { + Key string + Value string + }{ + {Key: "abaaz", Value: "v398342"}, + {Key: "zzaz", Value: "viaooouyenbdnya"}, + } + pref2 := []struct { + Key string + Value string + }{ + {Key: "abaaz", Value: "v398342"}, + {Key: "zzaz", Value: "viaooouyenbdnya"}, + {Key: "abazzz", Value: "v57869nbdnya"}, + {Key: "zozzz", Value: "vooouyenbdnya"}, + {Key: "zzzz", Value: "viaooouyenbdnya"}, + } + + records, _ := cache.Read("az", store.ReadSuffix()) + if len(records) != 2 { + t.Fatalf("Suffix read for \"az\" returned %d records, expected 2", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref1[index].Key != record.Key { + t.Errorf("Unexpected key for suffix \"az\", index %d, expected %s, got %s", index, pref1[index].Key, record.Key) + } + if pref1[index].Value != string(record.Value) { + t.Errorf("Unexpected value for suffix \"az\", index %d, expected %s, got %s", index, pref1[index].Value, record.Value) + } + } + + records, _ = cache.Read("z", store.ReadSuffix()) + if len(records) != 5 { + t.Fatalf("Suffix read for \"z\" returned %d records, expected 5", len(records)) + } + for index, record := range records { + if pref2[index].Key != record.Key { + t.Errorf("Unexpected key for suffix \"z\", index %d, expected %s, got %s", index, pref2[index].Key, record.Key) + } + if pref2[index].Value != string(record.Value) { + t.Errorf("Unexpected value for suffix \"z\", index %d, expected %s, got %s", index, pref2[index].Value, record.Value) + } + } +} + +func readPrefixSuffix(t *testing.T, cache store.Store) { + pref1 := []struct { + Key string + Value string + }{ + {Key: "zozzz", Value: "vooouyenbdnya"}, + {Key: "zzaz", Value: "viaooouyenbdnya"}, + {Key: "zzzz", Value: "viaooouyenbdnya"}, + } + pref2 := []struct { + Key string + Value string + }{ + {Key: "mbmbmb", Value: "viuyenbdnya"}, + {Key: "mbzzaamb", Value: "viunya"}, + } + + records, _ := cache.Read("z", store.ReadPrefix(), store.ReadSuffix()) + if len(records) != 3 { + t.Fatalf("Prefix-Suffix read for \"z\" returned %d records, expected 3", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref1[index].Key != record.Key { + t.Errorf("Unexpected key for prefix-suffix \"z\", index %d, expected %s, got %s", index, pref1[index].Key, record.Key) + } + if pref1[index].Value != string(record.Value) { + t.Errorf("Unexpected value for prefix-suffix \"z\", index %d, expected %s, got %s", index, pref1[index].Value, record.Value) + } + } + + records, _ = cache.Read("mb", store.ReadPrefix(), store.ReadSuffix()) + if len(records) != 2 { + t.Fatalf("Prefix-Suffix read for \"mb\" returned %d records, expected 2", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref2[index].Key != record.Key { + t.Errorf("Unexpected key for prefix-suffix \"mb\", index %d, expected %s, got %s", index, pref2[index].Key, record.Key) + } + if pref2[index].Value != string(record.Value) { + t.Errorf("Unexpected value for prefix-suffix \"mb\", index %d, expected %s, got %s", index, pref2[index].Value, record.Value) + } + } +} + +func readPrefixLimitOffset(t *testing.T, cache store.Store) { + pref1 := []struct { + Key string + Value string + }{ + {Key: "abaaz", Value: "v398342"}, + {Key: "abaya", Value: "v329487"}, + } + pref2 := []struct { + Key string + Value string + }{ + {Key: "abayakjdkj", Value: "v989898"}, + {Key: "abazzz", Value: "v57869nbdnya"}, + } + + records, _ := cache.Read("aba", store.ReadPrefix(), store.ReadLimit(2)) + if len(records) != 2 { + t.Fatalf("Limit prefix read for \"aba\" returned %d records, expected 2", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref1[index].Key != record.Key { + t.Errorf("Unexpected key for limit prefix \"aba\", index %d, expected %s, got %s", index, pref1[index].Key, record.Key) + } + if pref1[index].Value != string(record.Value) { + t.Errorf("Unexpected value for limit prefix \"aba\", index %d, expected %s, got %s", index, pref1[index].Value, record.Value) + } + } + + records, _ = cache.Read("aba", store.ReadPrefix(), store.ReadLimit(2), store.ReadOffset(2)) + if len(records) != 2 { + t.Fatalf("Offset-limit prefix read for \"aba\" returned %d records, expected 2", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref2[index].Key != record.Key { + t.Errorf("Unexpected key for offset-limit prefix \"aba\", index %d, expected %s, got %s", index, pref2[index].Key, record.Key) + } + if pref2[index].Value != string(record.Value) { + t.Errorf("Unexpected value for offset-limit prefix \"aba\", index %d, expected %s, got %s", index, pref2[index].Value, record.Value) + } + } +} + +func readSuffixLimitOffset(t *testing.T, cache store.Store) { + pref1 := []struct { + Key string + Value string + }{ + {Key: "abaaz", Value: "v398342"}, + {Key: "zzaz", Value: "viaooouyenbdnya"}, + } + pref2 := []struct { + Key string + Value string + }{ + {Key: "abazzz", Value: "v57869nbdnya"}, + {Key: "zozzz", Value: "vooouyenbdnya"}, + } + + records, _ := cache.Read("z", store.ReadSuffix(), store.ReadLimit(2)) + if len(records) != 2 { + t.Fatalf("Limit suffix read for \"z\" returned %d records, expected 2", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref1[index].Key != record.Key { + t.Errorf("Unexpected key for limit suffix \"z\", index %d, expected %s, got %s", index, pref1[index].Key, record.Key) + } + if pref1[index].Value != string(record.Value) { + t.Errorf("Unexpected value for limit suffix \"z\", index %d, expected %s, got %s", index, pref1[index].Value, record.Value) + } + } + + records, _ = cache.Read("z", store.ReadSuffix(), store.ReadLimit(2), store.ReadOffset(2)) + if len(records) != 2 { + t.Fatalf("Offset-limit suffix read for \"z\" returned %d records, expected 2", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref2[index].Key != record.Key { + t.Errorf("Unexpected key for offset-limit suffix \"z\", index %d, expected %s, got %s", index, pref2[index].Key, record.Key) + } + if pref2[index].Value != string(record.Value) { + t.Errorf("Unexpected value for offset-limit suffix \"z\", index %d, expected %s, got %s", index, pref2[index].Value, record.Value) + } + } +} + +func readPrefixSuffixLimitOffset(t *testing.T, cache store.Store) { + pref1 := []struct { + Key string + Value string + }{ + {Key: "zzaz", Value: "viaooouyenbdnya"}, + {Key: "zzzz", Value: "viaooouyenbdnya"}, + } + + records, _ := cache.Read("z", store.ReadPrefix(), store.ReadSuffix(), store.ReadOffset(1), store.ReadLimit(2)) + if len(records) != 2 { + t.Fatalf("Limit suffix read for \"z\" returned %d records, expected 2", len(records)) + } + for index, record := range records { + // it should be sorted alphabetically + if pref1[index].Key != record.Key { + t.Errorf("Unexpected key for limit suffix \"z\", index %d, expected %s, got %s", index, pref1[index].Key, record.Key) + } + if pref1[index].Value != string(record.Value) { + t.Errorf("Unexpected value for limit suffix \"z\", index %d, expected %s, got %s", index, pref1[index].Value, record.Value) + } + } +} + +func TestWriteExpiryAndRead(t *testing.T) { + cache := NewMemStore() + + data := map[string]string{ + "abaya": "v329487", + "abaaz": "v398342", + "abayakjdkj": "v989898", + "zzaz": "viaooouyenbdnya", + "abazzz": "v57869nbdnya", + "mbmbmb": "viuyenbdnya", + "mbzzaamb": "viunya", + "zozzz": "vooouyenbdnya", + } + + for key, value := range data { + record := &store.Record{ + Key: key, + Value: []byte(value), + Expiry: time.Second * 1000, + } + _ = cache.Write(record) + } + + records, _ := cache.Read("zzaz") + if len(records) != 1 { + t.Fatalf("Failed read for \"zzaz\" returned %d records, expected 1", len(records)) + } + record := records[0] + if record.Expiry < 999*time.Second || record.Expiry > 1000*time.Second { + // The expiry will be adjusted on retrieval + t.Errorf("Abnormal expiry range: expected %d-%d, got %d", 999*time.Second, 1000*time.Second, record.Expiry) + } +} + +func TestWriteExpiryWithExpiryAndRead(t *testing.T) { + cache := NewMemStore() + + data := map[string]string{ + "abaya": "v329487", + "abaaz": "v398342", + "abayakjdkj": "v989898", + "zzaz": "viaooouyenbdnya", + "abazzz": "v57869nbdnya", + "mbmbmb": "viuyenbdnya", + "mbzzaamb": "viunya", + "zozzz": "vooouyenbdnya", + } + + for key, value := range data { + record := &store.Record{ + Key: key, + Value: []byte(value), + Expiry: time.Second * 1000, + } + // write option will override the record data + _ = cache.Write(record, store.WriteExpiry(time.Now().Add(time.Hour))) + } + + records, _ := cache.Read("zzaz") + if len(records) != 1 { + t.Fatalf("Failed read for \"zzaz\" returned %d records, expected 1", len(records)) + } + record := records[0] + if record.Expiry < 3599*time.Second || record.Expiry > 3600*time.Second { + // The expiry will be adjusted on retrieval + t.Errorf("Abnormal expiry range: expected %d-%d, got %d", 3599*time.Second, 3600*time.Second, record.Expiry) + } +} + +func TestWriteExpiryWithTTLAndRead(t *testing.T) { + cache := NewMemStore() + + data := map[string]string{ + "abaya": "v329487", + "abaaz": "v398342", + "abayakjdkj": "v989898", + "zzaz": "viaooouyenbdnya", + "abazzz": "v57869nbdnya", + "mbmbmb": "viuyenbdnya", + "mbzzaamb": "viunya", + "zozzz": "vooouyenbdnya", + } + + for key, value := range data { + record := &store.Record{ + Key: key, + Value: []byte(value), + Expiry: time.Second * 1000, + } + // write option will override the record data, TTL takes precedence + _ = cache.Write(record, store.WriteTTL(20*time.Second), store.WriteExpiry(time.Now().Add(time.Hour))) + } + + records, _ := cache.Read("zzaz") + if len(records) != 1 { + t.Fatalf("Failed read for \"zzaz\" returned %d records, expected 1", len(records)) + } + record := records[0] + if record.Expiry < 19*time.Second || record.Expiry > 20*time.Second { + // The expiry will be adjusted on retrieval + t.Errorf("Abnormal expiry range: expected %d-%d, got %d", 19*time.Second, 20*time.Second, record.Expiry) + } +} + +func TestDelete(t *testing.T) { + cache := NewMemStore() + record := &store.Record{ + Key: "record", + Value: []byte("value for record"), + } + + records, err := cache.Read("record") + if err != store.ErrNotFound && len(records) > 0 { + t.Fatal("Found key in cache but it shouldn't be there") + } + + _ = cache.Write(record) + records, err = cache.Read("record") + if err != nil { + t.Fatal("Key not found in cache after inserting it") + } + if len(records) != 1 { + t.Fatal("Multiple keys found in cache after inserting it") + } + if records[0].Key != "record" && string(records[0].Value) != "value for record" { + t.Fatal("Wrong record retrieved") + } + + err = cache.Delete("record") + if err != nil { + t.Fatal("Error deleting the record") + } + + records, err = cache.Read("record") + if err != store.ErrNotFound && len(records) > 0 { + t.Fatal("Found key in cache but it shouldn't be there") + } +} + +func TestList(t *testing.T) { + cache := NewMemStore() + data := map[string]string{ + "abaya": "v329487", + "abaaz": "v398342", + "abayakjdkj": "v989898", + "zzzz": "viaooouyenbdnya", + "abazzz": "v57869nbdnya", + "mbmbmb": "viuyenbdnya", + "zozzz": "vooouyenbdnya", + "aboyo": "v889487", + "zzaaaz": "v999487", + } + + for key, value := range data { + record := &store.Record{ + Key: key, + Value: []byte(value), + } + _ = cache.Write(record) + } + + t.Run("Plain", func(t *testing.T) { + listPlain(t, cache) + }) + t.Run("Prefix", func(t *testing.T) { + listPrefix(t, cache) + }) + t.Run("Suffix", func(t *testing.T) { + listSuffix(t, cache) + }) + t.Run("PrefixSuffix", func(t *testing.T) { + listPrefixSuffix(t, cache) + }) + t.Run("LimitOffset", func(t *testing.T) { + listLimitOffset(t, cache) + }) + t.Run("PrefixLimitOffset", func(t *testing.T) { + listPrefixLimitOffset(t, cache) + }) + t.Run("SuffixLimitOffset", func(t *testing.T) { + listSuffixLimitOffset(t, cache) + }) + t.Run("PrefixSuffixLimitOffset", func(t *testing.T) { + listPrefixSuffixLimitOffset(t, cache) + }) +} + +func listPlain(t *testing.T, cache store.Store) { + keys, _ := cache.List() + expectedKeys := []string{"abaaz", "abaya", "abayakjdkj", "abazzz", "aboyo", "mbmbmb", "zozzz", "zzaaaz", "zzzz"} + if len(keys) != len(expectedKeys) { + t.Fatalf("Wrong number of keys, expected %d, got %d", len(expectedKeys), len(keys)) + } + + for index, key := range keys { + if key != expectedKeys[index] { + t.Errorf("Wrong key in the list in index %d, expected %s, got %s", index, expectedKeys[index], key) + } + } +} + +func listPrefix(t *testing.T, cache store.Store) { + keys, _ := cache.List(store.ListPrefix("aba")) + expectedKeys := []string{"abaaz", "abaya", "abayakjdkj", "abazzz"} + if len(keys) != len(expectedKeys) { + t.Fatalf("Wrong number of keys, expected %d, got %d", len(expectedKeys), len(keys)) + } + + for index, key := range keys { + if key != expectedKeys[index] { + t.Errorf("Wrong key in the list in index %d, expected %s, got %s", index, expectedKeys[index], key) + } + } +} + +func listSuffix(t *testing.T, cache store.Store) { + keys, _ := cache.List(store.ListSuffix("z")) + expectedKeys := []string{"zzaaaz", "abaaz", "abazzz", "zozzz", "zzzz"} + if len(keys) != len(expectedKeys) { + t.Fatalf("Wrong number of keys, expected %d, got %d", len(expectedKeys), len(keys)) + } + + for index, key := range keys { + if key != expectedKeys[index] { + t.Errorf("Wrong key in the list in index %d, expected %s, got %s", index, expectedKeys[index], key) + } + } +} + +func listPrefixSuffix(t *testing.T, cache store.Store) { + keys, _ := cache.List(store.ListPrefix("ab"), store.ListSuffix("z")) + expectedKeys := []string{"abaaz", "abazzz"} + if len(keys) != len(expectedKeys) { + t.Fatalf("Wrong number of keys, expected %d, got %d", len(expectedKeys), len(keys)) + } + + for index, key := range keys { + if key != expectedKeys[index] { + t.Errorf("Wrong key in the list in index %d, expected %s, got %s", index, expectedKeys[index], key) + } + } +} + +func listLimitOffset(t *testing.T, cache store.Store) { + keys, _ := cache.List(store.ListLimit(3), store.ListOffset(2)) + expectedKeys := []string{"abayakjdkj", "abazzz", "aboyo"} + if len(keys) != len(expectedKeys) { + t.Fatalf("Wrong number of keys, expected %d, got %d", len(expectedKeys), len(keys)) + } + + for index, key := range keys { + if key != expectedKeys[index] { + t.Errorf("Wrong key in the list in index %d, expected %s, got %s", index, expectedKeys[index], key) + } + } +} + +func listPrefixLimitOffset(t *testing.T, cache store.Store) { + keys, _ := cache.List(store.ListPrefix("aba"), store.ListLimit(2), store.ListOffset(1)) + expectedKeys := []string{"abaya", "abayakjdkj"} + if len(keys) != len(expectedKeys) { + t.Fatalf("Wrong number of keys, expected %d, got %d", len(expectedKeys), len(keys)) + } + + for index, key := range keys { + if key != expectedKeys[index] { + t.Errorf("Wrong key in the list in index %d, expected %s, got %s", index, expectedKeys[index], key) + } + } +} + +func listSuffixLimitOffset(t *testing.T, cache store.Store) { + keys, _ := cache.List(store.ListSuffix("z"), store.ListLimit(2), store.ListOffset(1)) + expectedKeys := []string{"abaaz", "abazzz"} + if len(keys) != len(expectedKeys) { + t.Fatalf("Wrong number of keys, expected %d, got %d", len(expectedKeys), len(keys)) + } + + for index, key := range keys { + if key != expectedKeys[index] { + t.Errorf("Wrong key in the list in index %d, expected %s, got %s", index, expectedKeys[index], key) + } + } +} + +func listPrefixSuffixLimitOffset(t *testing.T, cache store.Store) { + keys, _ := cache.List(store.ListPrefix("a"), store.ListSuffix("z"), store.ListLimit(2), store.ListOffset(1)) + expectedKeys := []string{"abazzz"} // only 2 available, and we skip the first one + if len(keys) != len(expectedKeys) { + t.Fatalf("Wrong number of keys, expected %d, got %d", len(expectedKeys), len(keys)) + } + + for index, key := range keys { + if key != expectedKeys[index] { + t.Errorf("Wrong key in the list in index %d, expected %s, got %s", index, expectedKeys[index], key) + } + } +} + +func TestEvictWriteUpdate(t *testing.T) { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": 3, + }, + )), + ) + + for i := 0; i < 3; i++ { + v := strconv.Itoa(i) + record := &store.Record{ + Key: v, + Value: []byte(v), + } + _ = cache.Write(record) + } + + // update first item + updatedRecord := &store.Record{ + Key: "0", + Value: []byte("zero"), + } + _ = cache.Write(updatedRecord) + + // new record, to force eviction + newRecord := &store.Record{ + Key: "new", + Value: []byte("newNew"), + } + _ = cache.Write(newRecord) + + records, _ := cache.Read("", store.ReadPrefix()) + if len(records) != 3 { + t.Fatalf("Wrong number of record returned, expected 3, got %d", len(records)) + } + + expectedKV := []struct { + Key string + Value string + }{ + {Key: "0", Value: "zero"}, + {Key: "2", Value: "2"}, + {Key: "new", Value: "newNew"}, + } + + for index, record := range records { + if record.Key != expectedKV[index].Key { + t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKV[index].Key, record.Key) + } + if string(record.Value) != expectedKV[index].Value { + t.Errorf("Wrong value for index %d, expected %s, got %s", index, expectedKV[index].Value, string(record.Value)) + } + } +} + +func TestEvictRead(t *testing.T) { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": 3, + }, + )), + ) + + for i := 0; i < 3; i++ { + v := strconv.Itoa(i) + record := &store.Record{ + Key: v, + Value: []byte(v), + } + _ = cache.Write(record) + } + + // Read first item + _, _ = cache.Read("0") + + // new record, to force eviction + newRecord := &store.Record{ + Key: "new", + Value: []byte("newNew"), + } + _ = cache.Write(newRecord) + + records, _ := cache.Read("", store.ReadPrefix()) + if len(records) != 3 { + t.Fatalf("Wrong number of record returned, expected 3, got %d", len(records)) + } + + expectedKV := []struct { + Key string + Value string + }{ + {Key: "0", Value: "0"}, + {Key: "2", Value: "2"}, + {Key: "new", Value: "newNew"}, + } + + for index, record := range records { + if record.Key != expectedKV[index].Key { + t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKV[index].Key, record.Key) + } + if string(record.Value) != expectedKV[index].Value { + t.Errorf("Wrong value for index %d, expected %s, got %s", index, expectedKV[index].Value, string(record.Value)) + } + } +} + +func TestEvictReadPrefix(t *testing.T) { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": 3, + }, + )), + ) + + for i := 0; i < 3; i++ { + v := strconv.Itoa(i) + record := &store.Record{ + Key: v, + Value: []byte(v), + } + _ = cache.Write(record) + } + + // Read prefix won't change evcition list + _, _ = cache.Read("0", store.ReadPrefix()) + + // new record, to force eviction + newRecord := &store.Record{ + Key: "new", + Value: []byte("newNew"), + } + _ = cache.Write(newRecord) + + records, _ := cache.Read("", store.ReadPrefix()) + if len(records) != 3 { + t.Fatalf("Wrong number of record returned, expected 3, got %d", len(records)) + } + + expectedKV := []struct { + Key string + Value string + }{ + {Key: "1", Value: "1"}, + {Key: "2", Value: "2"}, + {Key: "new", Value: "newNew"}, + } + + for index, record := range records { + if record.Key != expectedKV[index].Key { + t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKV[index].Key, record.Key) + } + if string(record.Value) != expectedKV[index].Value { + t.Errorf("Wrong value for index %d, expected %s, got %s", index, expectedKV[index].Value, string(record.Value)) + } + } +} + +func TestEvictReadSuffix(t *testing.T) { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": 3, + }, + )), + ) + + for i := 0; i < 3; i++ { + v := strconv.Itoa(i) + record := &store.Record{ + Key: v, + Value: []byte(v), + } + _ = cache.Write(record) + } + + // Read suffix won't change evcition list + _, _ = cache.Read("0", store.ReadSuffix()) + + // new record, to force eviction + newRecord := &store.Record{ + Key: "new", + Value: []byte("newNew"), + } + _ = cache.Write(newRecord) + + records, _ := cache.Read("", store.ReadPrefix()) + if len(records) != 3 { + t.Fatalf("Wrong number of record returned, expected 3, got %d", len(records)) + } + + expectedKV := []struct { + Key string + Value string + }{ + {Key: "1", Value: "1"}, + {Key: "2", Value: "2"}, + {Key: "new", Value: "newNew"}, + } + + for index, record := range records { + if record.Key != expectedKV[index].Key { + t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKV[index].Key, record.Key) + } + if string(record.Value) != expectedKV[index].Value { + t.Errorf("Wrong value for index %d, expected %s, got %s", index, expectedKV[index].Value, string(record.Value)) + } + } +} + +func TestEvictList(t *testing.T) { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": 3, + }, + )), + ) + + for i := 0; i < 3; i++ { + v := strconv.Itoa(i) + record := &store.Record{ + Key: v, + Value: []byte(v), + } + _ = cache.Write(record) + } + + // List won't change evcition list + _, _ = cache.List() + + // new record, to force eviction + newRecord := &store.Record{ + Key: "new", + Value: []byte("newNew"), + } + _ = cache.Write(newRecord) + + records, _ := cache.Read("", store.ReadPrefix()) + if len(records) != 3 { + t.Fatalf("Wrong number of record returned, expected 3, got %d", len(records)) + } + + expectedKV := []struct { + Key string + Value string + }{ + {Key: "1", Value: "1"}, + {Key: "2", Value: "2"}, + {Key: "new", Value: "newNew"}, + } + + for index, record := range records { + if record.Key != expectedKV[index].Key { + t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKV[index].Key, record.Key) + } + if string(record.Value) != expectedKV[index].Value { + t.Errorf("Wrong value for index %d, expected %s, got %s", index, expectedKV[index].Value, string(record.Value)) + } + } +} + +func TestExpireReadPrefix(t *testing.T) { + cache := NewMemStore() + + record := &store.Record{} + for i := 0; i < 20; i++ { + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + if i%2 == 0 { + record.Expiry = time.Duration(i) * time.Minute + } else { + record.Expiry = time.Duration(-i) * time.Minute + } + _ = cache.Write(record) + } + + records, _ := cache.Read("", store.ReadPrefix()) + if len(records) != 10 { + t.Fatalf("Wrong number of records, expected 10, got %d", len(records)) + } + + var expKeys []string + for i := 0; i < 20; i++ { + if i%2 == 0 { + expKeys = append(expKeys, strconv.Itoa(i)) + } + } + sort.Strings(expKeys) + + expKeyIndex := 0 + for _, record := range records { + if record.Key != expKeys[expKeyIndex] { + t.Fatalf("Wrong expected key, expected %s, got %s", expKeys[expKeyIndex], record.Key) + } + expKeyIndex++ + } +} + +func TestExpireReadSuffix(t *testing.T) { + cache := NewMemStore() + + record := &store.Record{} + for i := 0; i < 20; i++ { + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + if i%2 == 0 { + record.Expiry = time.Duration(i) * time.Minute + } else { + record.Expiry = time.Duration(-i) * time.Minute + } + _ = cache.Write(record) + } + + records, _ := cache.Read("", store.ReadSuffix()) + if len(records) != 10 { + t.Fatalf("Wrong number of records, expected 10, got %d", len(records)) + } + + var expKeys []string + for i := 0; i < 20; i++ { + if i%2 == 0 { + expKeys = append(expKeys, strconv.Itoa(i)) + } + } + sort.Slice(expKeys, func(i, j int) bool { + return reverseString(expKeys[i]) < reverseString(expKeys[j]) + }) + + expKeyIndex := 0 + for _, record := range records { + if record.Key != expKeys[expKeyIndex] { + t.Fatalf("Wrong expected key, expected %s, got %s", expKeys[expKeyIndex], record.Key) + } + expKeyIndex++ + } +} + +func TestExpireList(t *testing.T) { + cache := NewMemStore() + + record := &store.Record{} + for i := 0; i < 20; i++ { + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + if i%2 == 0 { + record.Expiry = time.Duration(i) * time.Minute + } else { + record.Expiry = time.Duration(-i) * time.Minute + } + _ = cache.Write(record) + } + + keys, _ := cache.List() + if len(keys) != 10 { + t.Fatalf("Wrong number of records, expected 10, got %d", len(keys)) + } + + var expKeys []string + for i := 0; i < 20; i++ { + if i%2 == 0 { + expKeys = append(expKeys, strconv.Itoa(i)) + } + } + sort.Strings(expKeys) + + expKeyIndex := 0 + for _, key := range keys { + if key != expKeys[expKeyIndex] { + t.Fatalf("Wrong expected key, expected %s, got %s", expKeys[expKeyIndex], key) + } + expKeyIndex++ + } +} + +func TestExpireListPrefix(t *testing.T) { + cache := NewMemStore() + + record := &store.Record{} + for i := 0; i < 20; i++ { + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + if i%2 == 0 { + record.Expiry = time.Duration(i) * time.Minute + } else { + record.Expiry = time.Duration(-i) * time.Minute + } + _ = cache.Write(record) + } + + keys, _ := cache.List(store.ListPrefix("1")) + if len(keys) != 5 { + t.Fatalf("Wrong number of records, expected 5, got %d", len(keys)) + } + + var expKeys []string + for i := 0; i < 20; i++ { + v := strconv.Itoa(i) + if i%2 == 0 && strings.HasPrefix(v, "1") { + expKeys = append(expKeys, v) + } + } + sort.Strings(expKeys) + + expKeyIndex := 0 + for _, key := range keys { + if key != expKeys[expKeyIndex] { + t.Fatalf("Wrong expected key, expected %s, got %s", expKeys[expKeyIndex], key) + } + expKeyIndex++ + } +} + +func TestExpireListSuffix(t *testing.T) { + cache := NewMemStore() + + record := &store.Record{} + for i := 0; i < 20; i++ { + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + if i%2 == 0 { + record.Expiry = time.Duration(i) * time.Minute + } else { + record.Expiry = time.Duration(-i) * time.Minute + } + _ = cache.Write(record) + } + + keys, _ := cache.List(store.ListSuffix("8")) + if len(keys) != 2 { + t.Fatalf("Wrong number of records, expected 2, got %d", len(keys)) + } + + var expKeys []string + for i := 0; i < 20; i++ { + v := strconv.Itoa(i) + if i%2 == 0 && strings.HasSuffix(v, "8") { + expKeys = append(expKeys, v) + } + } + sort.Slice(expKeys, func(i, j int) bool { + return reverseString(expKeys[i]) < reverseString(expKeys[j]) + }) + + expKeyIndex := 0 + for _, key := range keys { + if key != expKeys[expKeyIndex] { + t.Fatalf("Wrong expected key, expected %s, got %s", expKeys[expKeyIndex], key) + } + expKeyIndex++ + } +} + +func TestConcurrentWrite(t *testing.T) { + nThreads := []int{3, 10, 50} + + for _, threads := range nThreads { + t.Run("T"+strconv.Itoa(threads), func(t *testing.T) { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": 50000, + }, + )), + ) + + var wg sync.WaitGroup + var index int64 + + wg.Add(threads) + for i := 0; i < threads; i++ { + go func(cache store.Store, ind *int64) { + j := atomic.AddInt64(ind, 1) - 1 + for j < 100000 { + v := strconv.FormatInt(j, 10) + record := &store.Record{ + Key: v, + Value: []byte(v), + } + _ = cache.Write(record) + j = atomic.AddInt64(ind, 1) - 1 + } + wg.Done() + }(cache, &index) + } + wg.Wait() + + records, _ := cache.Read("", store.ReadPrefix()) + if len(records) != 50000 { + t.Fatalf("Wrong number of records, expected 50000, got %d", len(records)) + } + for _, record := range records { + if record.Key != string(record.Value) { + t.Fatalf("Wrong record found, key %s, value %s", record.Key, string(record.Value)) + } + } + }) + } +} + +func BenchmarkWrite(b *testing.B) { + cacheSizes := []int{512, 1024, 10000, 50000, 1000000} + + for _, size := range cacheSizes { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": size, + }, + )), + ) + record := &store.Record{} + + b.Run("CacheSize"+strconv.Itoa(size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + // records will be copied, so it's safe to overwrite the previous record + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + _ = cache.Write(record) + } + }) + } +} + +func BenchmarkRead(b *testing.B) { + cacheSizes := []int{512, 1024, 10000, 50000, 1000000} + + for _, size := range cacheSizes { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": size, + }, + )), + ) + record := &store.Record{} + + for i := 0; i < size; i++ { + // records will be copied, so it's safe to overwrite the previous record + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + _ = cache.Write(record) + } + b.Run("CacheSize"+strconv.Itoa(size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + v := strconv.Itoa(i) + _, _ = cache.Read(v) + } + }) + } +} + +func BenchmarkWriteMedKey(b *testing.B) { + cacheSizes := []int{512, 1024, 10000, 50000, 1000000} + + h := fnv.New128() + for _, size := range cacheSizes { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": size, + }, + )), + ) + record := &store.Record{} + + b.Run("CacheSize"+strconv.Itoa(size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + h.Reset() + v := strconv.Itoa(i) + bys := []byte(v) + h.Write(bys) + // records will be copied, so it's safe to overwrite the previous record + record.Key = hex.EncodeToString(h.Sum(nil)) + record.Value = bys + _ = cache.Write(record) + } + }) + } +} + +func BenchmarkReadMedKey(b *testing.B) { + cacheSizes := []int{512, 1024, 10000, 50000, 1000000} + + h := fnv.New128() + for _, size := range cacheSizes { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": size, + }, + )), + ) + record := &store.Record{} + + for i := 0; i < size; i++ { + h.Reset() + v := strconv.Itoa(i) + bys := []byte(v) + h.Write(bys) + // records will be copied, so it's safe to overwrite the previous record + record.Key = hex.EncodeToString(h.Sum(nil)) + record.Value = bys + _ = cache.Write(record) + } + b.Run("CacheSize"+strconv.Itoa(size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + h.Reset() + v := strconv.Itoa(i) + bys := []byte(v) + h.Write(bys) + _, _ = cache.Read(hex.EncodeToString(h.Sum(nil))) + } + }) + } +} + +func BenchmarkReadMedKeyPrefix(b *testing.B) { + cacheSizes := []int{512, 1024, 10000, 50000, 1000000} + + h := fnv.New128() + for _, size := range cacheSizes { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": size, + }, + )), + ) + record := &store.Record{} + + for i := 0; i < size; i++ { + h.Reset() + v := strconv.Itoa(i) + bys := []byte(v) + h.Write(bys) + // records will be copied, so it's safe to overwrite the previous record + record.Key = hex.EncodeToString(h.Sum(nil)) + record.Value = bys + _ = cache.Write(record) + } + b.Run("CacheSize"+strconv.Itoa(size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + h.Reset() + v := strconv.Itoa(i) + bys := []byte(v) + h.Write(bys) + _, _ = cache.Read(hex.EncodeToString(h.Sum(nil))[:10], store.ReadPrefix(), store.ReadLimit(50)) + } + }) + } +} + +func BenchmarkReadMedKeySuffix(b *testing.B) { + cacheSizes := []int{512, 1024, 10000, 50000, 1000000} + + h := fnv.New128() + for _, size := range cacheSizes { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": size, + }, + )), + ) + record := &store.Record{} + + for i := 0; i < size; i++ { + h.Reset() + v := strconv.Itoa(i) + bys := []byte(v) + h.Write(bys) + // records will be copied, so it's safe to overwrite the previous record + record.Key = hex.EncodeToString(h.Sum(nil)) + record.Value = bys + _ = cache.Write(record) + } + b.Run("CacheSize"+strconv.Itoa(size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + h.Reset() + v := strconv.Itoa(i) + bys := []byte(v) + h.Write(bys) + _, _ = cache.Read(hex.EncodeToString(h.Sum(nil))[23:], store.ReadSuffix(), store.ReadLimit(50)) + } + }) + } +} + +func concurrentStoreBench(b *testing.B, threads int) { + benchTest := map[string]int{ + "DefCap": 512, + "LimCap": 3, + "BigCap": 1000000, + } + for testname, size := range benchTest { + b.Run(testname, func(b *testing.B) { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": size, + }, + )), + ) + + b.SetParallelism(threads) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + h := fnv.New128() + record := &store.Record{} + for pb.Next() { + h.Reset() + v := strconv.Itoa(rand.Int()) //nolint:gosec + bys := []byte(v) + h.Write(bys) + // records will be copied, so it's safe to overwrite the previous record + record.Key = hex.EncodeToString(h.Sum(nil)) + record.Value = bys + _ = cache.Write(record) + } + }) + }) + } +} + +func concurrentRetrieveBench(b *testing.B, threads int) { + benchTest := map[string]int{ + "DefCap": 512, + "LimCap": 3, + "BigCap": 1000000, + } + for testname, size := range benchTest { + b.Run(testname, func(b *testing.B) { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": size, + }, + )), + ) + + record := &store.Record{} + for i := 0; i < size; i++ { + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + _ = cache.Write(record) + } + + b.SetParallelism(threads) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + v := strconv.Itoa(rand.Intn(size * 2)) //nolint:gosec + _, _ = cache.Read(v) + } + }) + }) + } +} + +func concurrentRemoveBench(b *testing.B, threads int) { + benchTest := map[string]int{ + "DefCap": 512, + "LimCap": 3, + "BigCap": 1000000, + } + for testname, size := range benchTest { + b.Run(testname, func(b *testing.B) { + cache := NewMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": size, + }, + )), + ) + + record := &store.Record{} + for i := 0; i < size; i++ { + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + _ = cache.Write(record) + } + + b.SetParallelism(threads) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + record := &store.Record{} + for pb.Next() { + v := strconv.Itoa(rand.Intn(size * 2)) //nolint:gosec + _ = cache.Delete(v) + record.Key = v + record.Value = []byte(v) + _ = cache.Write(record) + } + }) + }) + } +} + +func BenchmarkConcurrent(b *testing.B) { + threads := []int{3, 10, 50} + for _, nThreads := range threads { + nt := strconv.Itoa(nThreads) + b.Run("StoreT"+nt, func(b *testing.B) { + concurrentStoreBench(b, nThreads) + }) + b.Run("RetrieveT"+nt, func(b *testing.B) { + concurrentRetrieveBench(b, nThreads) + }) + b.Run("RemoveT"+nt, func(b *testing.B) { + concurrentRemoveBench(b, nThreads) + }) + } +} diff --git a/ocis-pkg/store/memory/multimemstore.go b/ocis-pkg/store/memory/multimemstore.go new file mode 100644 index 0000000000..89756c994c --- /dev/null +++ b/ocis-pkg/store/memory/multimemstore.go @@ -0,0 +1,158 @@ +package memory + +import ( + "sync" + + "go-micro.dev/v4/store" +) + +// In-memory store implementation using multiple MemStore to provide support +// for multiple databases and tables. +// Each table will be mapped to its own MemStore, which will be completely +// isolated from the rest. In particular, each MemStore will have its own +// capacity, so it's possible to have 10 MemStores with full capacity (512 +// by default) +// +// The options will be the same for all MemStores unless they're explicitly +// initialized otherwise. +// +// Since each MemStore is isolated, the required synchronization caused by +// concurrency will be minimal if the threads use different tables +type MultiMemStore struct { + storeMap map[string]*MemStore + storeMapLock sync.RWMutex + genOpts []store.Option +} + +// Create a new MultiMemStore. A new MemStore will be mapped based on the options. +// A default MemStore will be mapped if no Database and Table aren't used. +func NewMultiMemStore(opts ...store.Option) store.Store { + m := &MultiMemStore{ + storeMap: make(map[string]*MemStore), + genOpts: opts, + } + _ = m.Init(opts...) + return m +} + +func (m *MultiMemStore) getMemStore(prefix string) *MemStore { + m.storeMapLock.RLock() + mStore, exists := m.storeMap[prefix] + + if exists { + m.storeMapLock.RUnlock() + return mStore + } + + m.storeMapLock.RUnlock() + + // if not exists + newStore := NewMemStore(m.genOpts...).(*MemStore) + + m.storeMapLock.Lock() + m.storeMap[prefix] = newStore + m.storeMapLock.Unlock() + return newStore +} + +// Initialize the mapped MemStore based on the Database and Table values +// from the options with the same options. The target MemStore will be +// reinitialized if needed. +func (m *MultiMemStore) Init(opts ...store.Option) error { + optList := store.Options{} + for _, opt := range opts { + opt(&optList) + } + + prefix := optList.Database + "/" + optList.Table + + mStore := m.getMemStore(prefix) + return mStore.Init(opts...) +} + +// Get the options used to create the MultiMemStore. +// Specific options for each MemStore aren't available +func (m *MultiMemStore) Options() store.Options { + optList := store.Options{} + for _, opt := range m.genOpts { + opt(&optList) + } + return optList +} + +// Write the record in the target MemStore based on the Database and Table +// values from the options. A default MemStore will be used if no Database +// and Table options are provided. +// The write options will be forwarded to the target MemStore +func (m *MultiMemStore) Write(r *store.Record, opts ...store.WriteOption) error { + wopts := store.WriteOptions{} + for _, opt := range opts { + opt(&wopts) + } + + prefix := wopts.Database + "/" + wopts.Table + + mStore := m.getMemStore(prefix) + return mStore.Write(r, opts...) +} + +// Read the matching records in the target MemStore based on the Database and Table +// values from the options. A default MemStore will be used if no Database +// and Table options are provided. +// The read options will be forwarded to the target MemStore. +// +// The expectations regarding the results (sort order, eviction policies, etc) +// will be the same as the target MemStore +func (m *MultiMemStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + ropts := store.ReadOptions{} + for _, opt := range opts { + opt(&ropts) + } + + prefix := ropts.Database + "/" + ropts.Table + + mStore := m.getMemStore(prefix) + return mStore.Read(key, opts...) +} + +// Delete the matching records in the target MemStore based on the Database and Table +// values from the options. A default MemStore will be used if no Database +// and Table options are provided. +// +// Matching records from other Tables won't be affected. In fact, we won't +// access to other Tables +func (m *MultiMemStore) Delete(key string, opts ...store.DeleteOption) error { + dopts := store.DeleteOptions{} + for _, opt := range opts { + opt(&dopts) + } + + prefix := dopts.Database + "/" + dopts.Table + + mStore := m.getMemStore(prefix) + return mStore.Delete(key, opts...) +} + +// List the keys in the target MemStore based on the Database and Table +// values from the options. A default MemStore will be used if no Database +// and Table options are provided. +// The list options will be forwarded to the target MemStore. +func (m *MultiMemStore) List(opts ...store.ListOption) ([]string, error) { + lopts := store.ListOptions{} + for _, opt := range opts { + opt(&lopts) + } + + prefix := lopts.Database + "/" + lopts.Table + + mStore := m.getMemStore(prefix) + return mStore.List(opts...) +} + +func (m *MultiMemStore) Close() error { + return nil +} + +func (m *MultiMemStore) String() string { + return "MultiRadixMemStore" +} diff --git a/ocis-pkg/store/memory/multimemstore_test.go b/ocis-pkg/store/memory/multimemstore_test.go new file mode 100644 index 0000000000..30a9c8851f --- /dev/null +++ b/ocis-pkg/store/memory/multimemstore_test.go @@ -0,0 +1,172 @@ +package memory + +import ( + "context" + "strconv" + "testing" + + "go-micro.dev/v4/store" +) + +func TestWriteReadTables(t *testing.T) { + cache := NewMultiMemStore() + + record1 := &store.Record{ + Key: "sameKey", + Value: []byte("from record1"), + } + record2 := &store.Record{ + Key: "sameKey", + Value: []byte("from record2"), + } + + _ = cache.Write(record1) + _ = cache.Write(record2, store.WriteTo("DB02", "Table02")) + + records1, _ := cache.Read("sameKey") + if len(records1) != 1 { + t.Fatalf("Wrong number of records, expected 1, got %d", len(records1)) + } + if records1[0].Key != "sameKey" { + t.Errorf("Wrong key, expected \"sameKey\", got %s", records1[0].Key) + } + if string(records1[0].Value) != "from record1" { + t.Errorf("Wrong value, expected \"from record1\", got %s", string(records1[0].Value)) + } + + records2, _ := cache.Read("sameKey", store.ReadFrom("DB02", "Table02")) + if len(records2) != 1 { + t.Fatalf("Wrong number of records, expected 1, got %d", len(records2)) + } + if records2[0].Key != "sameKey" { + t.Errorf("Wrong key, expected \"sameKey\", got %s", records2[0].Key) + } + if string(records2[0].Value) != "from record2" { + t.Errorf("Wrong value, expected \"from record2\", got %s", string(records2[0].Value)) + } +} + +func TestDeleteTables(t *testing.T) { + cache := NewMultiMemStore() + + record1 := &store.Record{ + Key: "sameKey", + Value: []byte("from record1"), + } + record2 := &store.Record{ + Key: "sameKey", + Value: []byte("from record2"), + } + + _ = cache.Write(record1) + _ = cache.Write(record2, store.WriteTo("DB02", "Table02")) + + records1, _ := cache.Read("sameKey") + if len(records1) != 1 { + t.Fatalf("Wrong number of records, expected 1, got %d", len(records1)) + } + if records1[0].Key != "sameKey" { + t.Errorf("Wrong key, expected \"sameKey\", got %s", records1[0].Key) + } + if string(records1[0].Value) != "from record1" { + t.Errorf("Wrong value, expected \"from record1\", got %s", string(records1[0].Value)) + } + + records2, _ := cache.Read("sameKey", store.ReadFrom("DB02", "Table02")) + if len(records2) != 1 { + t.Fatalf("Wrong number of records, expected 1, got %d", len(records2)) + } + if records2[0].Key != "sameKey" { + t.Errorf("Wrong key, expected \"sameKey\", got %s", records2[0].Key) + } + if string(records2[0].Value) != "from record2" { + t.Errorf("Wrong value, expected \"from record2\", got %s", string(records2[0].Value)) + } + + _ = cache.Delete("sameKey") + if _, err := cache.Read("sameKey"); err != store.ErrNotFound { + t.Errorf("Key \"sameKey\" still exists after deletion") + } + + records2, _ = cache.Read("sameKey", store.ReadFrom("DB02", "Table02")) + if len(records2) != 1 { + t.Fatalf("Wrong number of records, expected 1, got %d", len(records2)) + } + if records2[0].Key != "sameKey" { + t.Errorf("Wrong key, expected \"sameKey\", got %s", records2[0].Key) + } + if string(records2[0].Value) != "from record2" { + t.Errorf("Wrong value, expected \"from record2\", got %s", string(records2[0].Value)) + } +} + +func TestListTables(t *testing.T) { + cache := NewMultiMemStore() + + record1 := &store.Record{ + Key: "key001", + Value: []byte("from record1"), + } + record2 := &store.Record{ + Key: "key002", + Value: []byte("from record2"), + } + + _ = cache.Write(record1) + _ = cache.Write(record2, store.WriteTo("DB02", "Table02")) + + keys, _ := cache.List(store.ListFrom("DB02", "Table02")) + expectedKeys := []string{"key002"} + if len(keys) != 1 { + t.Fatalf("Wrong number of keys, expected 1, got %d", len(keys)) + } + for index, key := range keys { + if expectedKeys[index] != key { + t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKeys[index], key) + } + } +} + +func TestWriteSizeLimit(t *testing.T) { + cache := NewMultiMemStore( + store.WithContext( + NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": 2, + }, + ), + ), + ) + + record := &store.Record{} + for i := 0; i < 4; i++ { + v := strconv.Itoa(i) + record.Key = v + record.Value = []byte(v) + _ = cache.Write(record) + _ = cache.Write(record, store.WriteTo("DB02", "Table02")) + } + + keys1, _ := cache.List() + expectedKeys1 := []string{"2", "3"} + if len(keys1) != 2 { + t.Fatalf("Wrong number of keys, expected 2, got %d", len(keys1)) + } + for index, key := range keys1 { + if expectedKeys1[index] != key { + t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKeys1[index], key) + } + } + + keys2, _ := cache.List(store.ListFrom("DB02", "Table02")) + expectedKeys2 := []string{"2", "3"} + if len(keys2) != 2 { + t.Fatalf("Wrong number of keys, expected 2, got %d", len(keys2)) + } + for index, key := range keys2 { + if expectedKeys2[index] != key { + t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKeys2[index], key) + } + } +} diff --git a/ocis-pkg/store/memory/utils.go b/ocis-pkg/store/memory/utils.go new file mode 100644 index 0000000000..773567f6c4 --- /dev/null +++ b/ocis-pkg/store/memory/utils.go @@ -0,0 +1,63 @@ +package memory + +import ( + "time" + + "go-micro.dev/v4/store" +) + +func toStoreRecord(src *store.Record, options store.WriteOptions) *storeRecord { + newRecord := &storeRecord{} + newRecord.Key = src.Key + newRecord.Value = make([]byte, len(src.Value)) + copy(newRecord.Value, src.Value) + + // set base ttl duration and expiration time based on the record + newRecord.Expiry = src.Expiry + if src.Expiry != 0 { + newRecord.ExpiresAt = time.Now().Add(src.Expiry) + } + + // overwrite ttl duration and expiration time based on options + if !options.Expiry.IsZero() { + // options.Expiry is a time.Time, newRecord.Expiry is a time.Duration + newRecord.Expiry = time.Until(options.Expiry) + newRecord.ExpiresAt = options.Expiry + } + + // TTL option takes precedence over expiration time + if options.TTL != 0 { + newRecord.Expiry = options.TTL + newRecord.ExpiresAt = time.Now().Add(options.TTL) + } + + newRecord.Metadata = make(map[string]interface{}) + for k, v := range src.Metadata { + newRecord.Metadata[k] = v + } + return newRecord +} + +func fromStoreRecord(src *storeRecord) *store.Record { + newRecord := &store.Record{} + newRecord.Key = src.Key + newRecord.Value = make([]byte, len(src.Value)) + copy(newRecord.Value, src.Value) + if src.Expiry != 0 { + newRecord.Expiry = time.Until(src.ExpiresAt) + } + + newRecord.Metadata = make(map[string]interface{}) + for k, v := range src.Metadata { + newRecord.Metadata[k] = v + } + return newRecord +} + +func reverseString(s string) string { + r := []rune(s) + for i, j := 0, len(r)-1; i < len(r)/2; i, j = i+1, j-1 { + r[i], r[j] = r[j], r[i] + } + return string(r) +} diff --git a/ocis-pkg/store/store.go b/ocis-pkg/store/store.go new file mode 100644 index 0000000000..f41cc7285c --- /dev/null +++ b/ocis-pkg/store/store.go @@ -0,0 +1,91 @@ +package store + +import ( + "context" + "strings" + + "github.com/owncloud/ocis/v2/ocis-pkg/store/etcd" + "github.com/owncloud/ocis/v2/ocis-pkg/store/memory" + "go-micro.dev/v4/store" +) + +var ( + storeEnv = "OCIS_STORE" + storeAddressEnv = "OCIS_STORE_ADDRESS" + storeOCMemSize = "OCIS_STORE_OCMEM_SIZE" +) + +var ocMemStore *store.Store + +type OcisStoreOptions struct { + Type string + Address string + Size int +} + +// Get the configured key-value store to be used. +// +// Each microservice (or whatever piece is using the store) should use the +// options available in the interface's operations to choose the right database +// and table to prevent collisions with other microservices. +// Recommended approach is to use "services" or "ocis-pkg" for the database, +// and "services//" or "ocis-pkg//" for the package name. +// +// So far, only the name of the store and the node addresses are configurable +// via environment variables. +// Available options for "OCIS_STORE" are: +// * "noop", for a noop store (it does nothing) +// * "etcd", for etcd +// * "ocmem", custom in-memory implementation, with fixed size and optimized prefix +// and suffix search +// * "memory", for a in-memory implementation, which is the default if noone matches +// +// "OCIS_STORE_ADDRESS" is a comma-separated list of nodes that the store +// will use. This is currently usable only with the etcd implementation. If it +// isn't provided, "127.0.0.1:2379" will be the only node used. +// +// "OCIS_STORE_OCMEM_SIZE" will configure the maximum capacity of the cache for +// the "ocmem" implementation, in number of items that the cache can hold per table. +// You can use "OCIS_STORE_OCMEM_SIZE=5000" so the cache will hold up to 5000 elements. +// The parameter only affects to the "ocmem" implementation, the rest will ignore it. +// If an invalid value is used, the default will be used instead, so up to 512 elements +// the cache will hold. +func GetStore(ocisOpts OcisStoreOptions) store.Store { + var s store.Store + + addresses := strings.Split(ocisOpts.Address, ",") + opts := []store.Option{ + store.Nodes(addresses...), + } + + switch ocisOpts.Type { + case "noop": + s = store.NewNoopStore(opts...) + case "etcd": + s = etcd.NewEtcdStore(opts...) + case "ocmem": + if ocMemStore == nil { + var memStore store.Store + + sizeNum := ocisOpts.Size + if sizeNum <= 0 { + memStore = memory.NewMultiMemStore() + } else { + memStore = memory.NewMultiMemStore( + store.WithContext( + memory.NewContext( + context.Background(), + map[string]interface{}{ + "maxCap": sizeNum, + }, + )), + ) + } + ocMemStore = &memStore + } + s = *ocMemStore + default: + s = store.NewMemoryStore(opts...) + } + return s +} diff --git a/services/graph/pkg/config/cachestore.go b/services/graph/pkg/config/cachestore.go new file mode 100644 index 0000000000..f396ed7001 --- /dev/null +++ b/services/graph/pkg/config/cachestore.go @@ -0,0 +1,8 @@ +package config + +// CacheStore defines the available configuration for the cache store +type CacheStore struct { + Type string `yaml:"type" env:"OCIS_CACHE_STORE_TYPE;GRAPH_CACHE_STORE_TYPE" desc:"The type of the cache store. Valid options are \"noop\", \"ocmem\", \"etcd\" and \"memory\""` + Address string `yaml:"address" env:"OCIS_CACHE_STORE_ADDRESS;GRAPH_CACHE_STORE_ADDRESS" desc:"a comma-separated list of addresses to connect to. Only for etcd"` + Size int `yaml:"size" env:"OCIS_CACHE_STORE_SIZE;GRAPH_CACHE_STORE_SIZE" desc:"Maximum size for the cache store. Only ocmem will use this option, in number of items per table. The rest will ignore the option and can grow indefinitely"` +} diff --git a/services/graph/pkg/config/config.go b/services/graph/pkg/config/config.go index 6fea8abc12..8e42344e9f 100644 --- a/services/graph/pkg/config/config.go +++ b/services/graph/pkg/config/config.go @@ -12,9 +12,10 @@ type Config struct { Service Service `yaml:"-"` - Tracing *Tracing `yaml:"tracing"` - Log *Log `yaml:"log"` - Debug Debug `yaml:"debug"` + Tracing *Tracing `yaml:"tracing"` + Log *Log `yaml:"log"` + CacheStore *CacheStore `yaml:"cache_store"` + Debug Debug `yaml:"debug"` HTTP HTTP `yaml:"http"` diff --git a/services/graph/pkg/config/defaults/defaultconfig.go b/services/graph/pkg/config/defaults/defaultconfig.go index d9f34b54a7..a4648a5578 100644 --- a/services/graph/pkg/config/defaults/defaultconfig.go +++ b/services/graph/pkg/config/defaults/defaultconfig.go @@ -96,6 +96,16 @@ func EnsureDefaults(cfg *config.Config) { cfg.Tracing = &config.Tracing{} } + if cfg.CacheStore == nil && cfg.Commons != nil && cfg.Commons.CacheStore != nil { + cfg.CacheStore = &config.CacheStore{ + Type: cfg.Commons.CacheStore.Type, + Address: cfg.Commons.CacheStore.Address, + Size: cfg.Commons.CacheStore.Size, + } + } else if cfg.CacheStore == nil { + cfg.CacheStore = &config.CacheStore{} + } + if cfg.TokenManager == nil && cfg.Commons != nil && cfg.Commons.TokenManager != nil { cfg.TokenManager = &config.TokenManager{ JWTSecret: cfg.Commons.TokenManager.JWTSecret, diff --git a/services/graph/pkg/service/v0/service.go b/services/graph/pkg/service/v0/service.go index a60e61583a..5d39a507ae 100644 --- a/services/graph/pkg/service/v0/service.go +++ b/services/graph/pkg/service/v0/service.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "net/http" "strconv" - "time" "github.com/ReneKroon/ttlcache/v2" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" @@ -16,6 +15,7 @@ import ( ocisldap "github.com/owncloud/ocis/v2/ocis-pkg/ldap" "github.com/owncloud/ocis/v2/ocis-pkg/roles" "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + "github.com/owncloud/ocis/v2/ocis-pkg/store" settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/graph/pkg/identity" "github.com/owncloud/ocis/v2/services/graph/pkg/identity/ldap" @@ -144,9 +144,13 @@ func NewService(opts ...Option) Service { roleManager := options.RoleManager if roleManager == nil { + storeOptions := store.OcisStoreOptions{ + Type: options.Config.CacheStore.Type, + Address: options.Config.CacheStore.Address, + Size: options.Config.CacheStore.Size, + } m := roles.NewManager( - roles.CacheSize(1024), - roles.CacheTTL(time.Hour), + roles.StoreOptions(storeOptions), roles.Logger(options.Logger), roles.RoleService(svc.roleService), ) diff --git a/services/ocs/pkg/config/cachestore.go b/services/ocs/pkg/config/cachestore.go new file mode 100644 index 0000000000..af8fea315c --- /dev/null +++ b/services/ocs/pkg/config/cachestore.go @@ -0,0 +1,8 @@ +package config + +// CacheStore defines the available configuration for the cache store +type CacheStore struct { + Type string `yaml:"type" env:"OCIS_CACHE_STORE_TYPE;OCS_CACHE_STORE_TYPE" desc:"The type of the cache store. Valid options are \"noop\", \"ocmem\", \"etcd\" and \"memory\""` + Address string `yaml:"address" env:"OCIS_CACHE_STORE_ADDRESS;OCS_CACHE_STORE_ADDRESS" desc:"a comma-separated list of addresses to connect to. Only for etcd"` + Size int `yaml:"size" env:"OCIS_CACHE_STORE_SIZE;OCS_CACHE_STORE_SIZE" desc:"Maximum size for the cache store. Only ocmem will use this option, in number of items per table. The rest will ignore the option and can grow indefinitely"` +} diff --git a/services/ocs/pkg/config/config.go b/services/ocs/pkg/config/config.go index 5ec5d1f0c8..9729fa19d7 100644 --- a/services/ocs/pkg/config/config.go +++ b/services/ocs/pkg/config/config.go @@ -12,9 +12,10 @@ type Config struct { Service Service `yaml:"-"` - Tracing *Tracing `yaml:"tracing"` - Log *Log `yaml:"log"` - Debug Debug `yaml:"debug"` + Tracing *Tracing `yaml:"tracing"` + Log *Log `yaml:"log"` + CacheStore *CacheStore `yaml:"cache_store"` + Debug Debug `yaml:"debug"` HTTP HTTP `yaml:"http"` diff --git a/services/ocs/pkg/config/defaults/defaultconfig.go b/services/ocs/pkg/config/defaults/defaultconfig.go index 4bac1a18cb..7e9c03ea94 100644 --- a/services/ocs/pkg/config/defaults/defaultconfig.go +++ b/services/ocs/pkg/config/defaults/defaultconfig.go @@ -69,6 +69,16 @@ func EnsureDefaults(cfg *config.Config) { cfg.Tracing = &config.Tracing{} } + if cfg.CacheStore == nil && cfg.Commons != nil && cfg.Commons.CacheStore != nil { + cfg.CacheStore = &config.CacheStore{ + Type: cfg.Commons.CacheStore.Type, + Address: cfg.Commons.CacheStore.Address, + Size: cfg.Commons.CacheStore.Size, + } + } else if cfg.CacheStore == nil { + cfg.CacheStore = &config.CacheStore{} + } + if cfg.Reva == nil && cfg.Commons != nil && cfg.Commons.Reva != nil { cfg.Reva = &config.Reva{ Address: cfg.Commons.Reva.Address, diff --git a/services/ocs/pkg/service/v0/service.go b/services/ocs/pkg/service/v0/service.go index 6a568ee35f..3442948c9c 100644 --- a/services/ocs/pkg/service/v0/service.go +++ b/services/ocs/pkg/service/v0/service.go @@ -2,10 +2,10 @@ package svc import ( "net/http" - "time" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + "github.com/owncloud/ocis/v2/ocis-pkg/store" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" @@ -42,9 +42,13 @@ func NewService(opts ...Option) Service { } roleManager := options.RoleManager if roleManager == nil { + storeOptions := store.OcisStoreOptions{ + Type: options.Config.CacheStore.Type, + Address: options.Config.CacheStore.Address, + Size: options.Config.CacheStore.Size, + } m := roles.NewManager( - roles.CacheSize(1024), - roles.CacheTTL(time.Hour*24*7), + roles.StoreOptions(storeOptions), roles.Logger(options.Logger), roles.RoleService(roleService), )