mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-04-27 10:48:35 -04:00
settings: User go-micro/store for cachiing
The previous implementation was using an unlimited TTL which would cause problems in scale out deployments where multiple instances of the settings service are running. Fixes: #5067
This commit is contained in:
committed by
Ralf Haferkamp
parent
18bb3dbaca
commit
df5edec36b
@@ -2,6 +2,7 @@ package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
|
||||
settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0"
|
||||
@@ -45,4 +46,16 @@ type Metadata struct {
|
||||
SystemUserID string `yaml:"system_user_id" env:"OCIS_SYSTEM_USER_ID;SETTINGS_SYSTEM_USER_ID" desc:"ID of the oCIS STORAGE-SYSTEM system user. Admins need to set the ID for the STORAGE-SYSTEM system user in this config option which is then used to reference the user. Any reasonable long string is possible, preferably this would be an UUIDv4 format."`
|
||||
SystemUserIDP string `yaml:"system_user_idp" env:"OCIS_SYSTEM_USER_IDP;SETTINGS_SYSTEM_USER_IDP" desc:"IDP of the oCIS STORAGE-SYSTEM system user."`
|
||||
SystemUserAPIKey string `yaml:"system_user_api_key" env:"OCIS_SYSTEM_USER_API_KEY" desc:"API key for the STORAGE-SYSTEM system user."`
|
||||
Cache *Cache `yaml:"cache"`
|
||||
}
|
||||
|
||||
// Cache configures the cache of the Metadata store
|
||||
type Cache struct {
|
||||
Store string `yaml:"store" env:"OCIS_CACHE_STORE;SETTINGS_CACHE_STORE" desc:"The type of the cache store. Supported values are: 'memory', 'ocmem', 'etcd', 'redis', 'redis-sentinel', 'nats-js', 'noop'. See the text description for details."`
|
||||
Nodes []string `yaml:"addresses" env:"OCIS_CACHE_STORE_NODES;SETTINGS_CACHE_STORE_NODES" desc:"A comma separated list of nodes to access the configured store. This has no effect when 'memory' or 'ocmem' stores are configured. Note that the behaviour how nodes are used is dependent on the library of the configured store."`
|
||||
Database string `yaml:"database" env:"OCIS_CACHE_DATABASE" desc:"The database name the configured store should use."`
|
||||
FileTable string `yaml:"files_table" env:"SETTINGS_FILE_CACHE_TABLE" desc:"The database table the store should use for the file cache."`
|
||||
DirectoryTable string `yaml:"directories_table" env:"SETTINGS_DIRECTORY_CACHE_TABLE" desc:"The database table the store should use for the directory cache."`
|
||||
TTL time.Duration `yaml:"ttl" env:"OCIS_CACHE_TTL;SETTINGS_CACHE_TTL" desc:"Default time to live for entries in the cache. Only applied when access tokens has no expiration. The duration can be set as number followed by a unit identifier like s, m or h. Defaults to '10m' (10 minutes)."`
|
||||
Size int `yaml:"size" env:"OCIS_CACHE_SIZE;SETTINGS_CACHE_SIZE" desc:"The maximum quantity of items in the cache. Only applies when store type 'ocmem' is configured. Defaults to 512."`
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/config/defaults"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/structs"
|
||||
@@ -55,6 +56,13 @@ func DefaultConfig() *config.Config {
|
||||
GatewayAddress: "127.0.0.1:9215", // system storage
|
||||
StorageAddress: "127.0.0.1:9215",
|
||||
SystemUserIDP: "internal",
|
||||
Cache: &config.Cache{
|
||||
Store: "memory",
|
||||
Database: "ocis",
|
||||
FileTable: "settings_files",
|
||||
DirectoryTable: "settings_dirs",
|
||||
TTL: time.Minute * 10,
|
||||
},
|
||||
},
|
||||
BundlesPath: "",
|
||||
Bundles: nil,
|
||||
|
||||
@@ -3,42 +3,49 @@ package store
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jellydator/ttlcache/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
cachettl = 0
|
||||
"github.com/cs3org/reva/v2/pkg/store"
|
||||
olog "github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/services/settings/pkg/config"
|
||||
"github.com/shamaton/msgpack/v2"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
// CachedMDC is cache for the metadataclient
|
||||
type CachedMDC struct {
|
||||
next MetadataClient
|
||||
cfg *config.Config
|
||||
logger olog.Logger
|
||||
next MetadataClient
|
||||
|
||||
files *ttlcache.Cache
|
||||
dirs *ttlcache.Cache
|
||||
filesCache microstore.Store
|
||||
dirsCache microstore.Store
|
||||
}
|
||||
|
||||
// SimpleDownload caches the answer from SimpleDownload or returns the cached one
|
||||
func (c *CachedMDC) SimpleDownload(ctx context.Context, id string) ([]byte, error) {
|
||||
if b, err := c.files.Get(id); err == nil {
|
||||
return b.([]byte), nil
|
||||
if b, err := c.filesCache.Read(id); err == nil && len(b) == 1 {
|
||||
return b[0].Value, nil
|
||||
}
|
||||
b, err := c.next.SimpleDownload(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_ = c.files.Set(id, b)
|
||||
err = c.filesCache.Write(µstore.Record{
|
||||
Key: id,
|
||||
Value: b,
|
||||
Expiry: c.cfg.Metadata.Cache.TTL,
|
||||
})
|
||||
if err != nil {
|
||||
c.logger.Error().Err(err).Msg("SimpleDownload: failed to update to files cache")
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// SimpleUpload caches the answer from SimpleUpload and invalidates the cache
|
||||
func (c *CachedMDC) SimpleUpload(ctx context.Context, id string, content []byte) error {
|
||||
b, err := c.files.Get(id)
|
||||
if err == nil && string(b.([]byte)) == string(content) {
|
||||
b, err := c.filesCache.Read(id)
|
||||
if err == nil && len(b) == 1 && string(b[0].Value) == string(content) {
|
||||
// no need to bug mdc
|
||||
return nil
|
||||
}
|
||||
@@ -49,8 +56,18 @@ func (c *CachedMDC) SimpleUpload(ctx context.Context, id string, content []byte)
|
||||
}
|
||||
|
||||
// invalidate caches
|
||||
_ = c.dirs.Remove(path.Dir(id))
|
||||
_ = c.files.Set(id, content)
|
||||
if err = c.dirsCache.Delete(path.Dir(id)); err != nil {
|
||||
c.logger.Error().Err(err).Msg("failed to clear dirs cache")
|
||||
}
|
||||
|
||||
err = c.filesCache.Write(µstore.Record{
|
||||
Key: id,
|
||||
Value: content,
|
||||
Expiry: c.cfg.Metadata.Cache.TTL,
|
||||
})
|
||||
if err != nil {
|
||||
c.logger.Error().Err(err).Msg("SimpleUpload: failed to update to files cache")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -61,16 +78,20 @@ func (c *CachedMDC) Delete(ctx context.Context, id string) error {
|
||||
}
|
||||
|
||||
// invalidate caches
|
||||
_ = removePrefix(c.files, id)
|
||||
_ = removePrefix(c.dirs, id)
|
||||
_ = c.removePrefix(c.filesCache, id)
|
||||
_ = c.removePrefix(c.dirsCache, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadDir caches the response from ReadDir or returnes the cached one
|
||||
func (c *CachedMDC) ReadDir(ctx context.Context, id string) ([]string, error) {
|
||||
i, err := c.dirs.Get(id)
|
||||
if err == nil {
|
||||
return i.([]string), nil
|
||||
i, err := c.dirsCache.Read(id)
|
||||
if err == nil && len(i) == 1 {
|
||||
var ret []string
|
||||
if err = msgpack.Unmarshal(i[0].Value, &ret); err == nil {
|
||||
return ret, nil
|
||||
}
|
||||
c.logger.Error().Err(err).Msg("failed to unmarshal entry from dirs cache")
|
||||
}
|
||||
|
||||
s, err := c.next.ReadDir(ctx, id)
|
||||
@@ -78,7 +99,21 @@ func (c *CachedMDC) ReadDir(ctx context.Context, id string) ([]string, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, c.dirs.Set(id, s)
|
||||
var value []byte
|
||||
if value, err = msgpack.Marshal(s); err != nil {
|
||||
c.logger.Error().Err(err).Msg("failed to marshal ReadDir result for dirs cache")
|
||||
return s, err
|
||||
}
|
||||
err = c.dirsCache.Write(µstore.Record{
|
||||
Key: id,
|
||||
Value: value,
|
||||
Expiry: c.cfg.Metadata.Cache.TTL,
|
||||
})
|
||||
if err != nil {
|
||||
c.logger.Error().Err(err).Msg("ReadDir: failed to update dirs cache")
|
||||
}
|
||||
|
||||
return s, err
|
||||
}
|
||||
|
||||
// MakeDirIfNotExist invalidates the cache
|
||||
@@ -89,30 +124,44 @@ func (c *CachedMDC) MakeDirIfNotExist(ctx context.Context, id string) error {
|
||||
}
|
||||
|
||||
// invalidate caches
|
||||
_ = c.dirs.Remove(path.Dir(id))
|
||||
if err = c.dirsCache.Delete(path.Dir(id)); err != nil {
|
||||
c.logger.Error().Err(err).Msg("failed to clear dirs cache")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init instantiates the caches
|
||||
func (c *CachedMDC) Init(ctx context.Context, id string) error {
|
||||
c.dirs = initCache(cachettl)
|
||||
c.files = initCache(cachettl)
|
||||
c.dirsCache = store.Create(
|
||||
store.Store(c.cfg.Metadata.Cache.Store),
|
||||
store.TTL(c.cfg.Metadata.Cache.TTL),
|
||||
store.Size(c.cfg.Metadata.Cache.Size),
|
||||
microstore.Nodes(c.cfg.Metadata.Cache.Nodes...),
|
||||
microstore.Database(c.cfg.Metadata.Cache.Database),
|
||||
microstore.Table(c.cfg.Metadata.Cache.DirectoryTable),
|
||||
)
|
||||
c.filesCache = store.Create(
|
||||
store.Store(c.cfg.Metadata.Cache.Store),
|
||||
store.TTL(c.cfg.Metadata.Cache.TTL),
|
||||
store.Size(c.cfg.Metadata.Cache.Size),
|
||||
microstore.Nodes(c.cfg.Metadata.Cache.Nodes...),
|
||||
microstore.Database(c.cfg.Metadata.Cache.Database),
|
||||
microstore.Table(c.cfg.Metadata.Cache.FileTable),
|
||||
)
|
||||
return c.next.Init(ctx, id)
|
||||
}
|
||||
|
||||
func initCache(ttlSeconds int) *ttlcache.Cache {
|
||||
cache := ttlcache.NewCache()
|
||||
_ = cache.SetTTL(time.Duration(ttlSeconds) * time.Second)
|
||||
cache.SkipTTLExtensionOnHit(true)
|
||||
return cache
|
||||
}
|
||||
|
||||
func removePrefix(cache *ttlcache.Cache, prefix string) error {
|
||||
for _, k := range cache.GetKeys() {
|
||||
if strings.HasPrefix(k, prefix) {
|
||||
if err := cache.Remove(k); err != nil {
|
||||
return err
|
||||
}
|
||||
func (c *CachedMDC) removePrefix(cache microstore.Store, prefix string) error {
|
||||
c.logger.Error().Str("prefix", prefix).Msg("removePrefix")
|
||||
keys, err := cache.List(microstore.ListPrefix(prefix))
|
||||
if err != nil {
|
||||
c.logger.Error().Err(err).Msg("failed to list cache entries")
|
||||
}
|
||||
for _, k := range keys {
|
||||
c.logger.Error().Str("key", k).Msg("removePrefix")
|
||||
if err := cache.Delete(k); err != nil {
|
||||
c.logger.Error().Err(err).Msg("failed to remove prefix from cache")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -60,7 +60,11 @@ func (s *Store) Init() {
|
||||
return
|
||||
}
|
||||
|
||||
mdc := &CachedMDC{next: NewMetadataClient(s.cfg.Metadata)}
|
||||
mdc := &CachedMDC{
|
||||
next: NewMetadataClient(s.cfg.Metadata),
|
||||
cfg: s.cfg,
|
||||
logger: s.Logger,
|
||||
}
|
||||
if err := s.initMetadataClient(mdc); err != nil {
|
||||
s.Logger.Error().Err(err).Msg("error initializing metadata client")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user