This commit is contained in:
Jarek Kowalski
2017-10-29 11:42:11 -07:00
parent 0f6c569435
commit f70d2af03d
2 changed files with 89 additions and 89 deletions

View File

@@ -25,103 +25,103 @@ type metadataCache struct {
cachedData map[string][]byte
}
func (mc *metadataCache) ListBlocks(prefix string) ([]string, error) {
func (c *metadataCache) ListBlocks(prefix string) ([]string, error) {
var result []string
mc.mu.Lock()
p := sort.SearchStrings(mc.sortedNames, prefix)
for p < len(mc.sortedNames) && strings.HasPrefix(mc.sortedNames[p], prefix) {
if !isReservedName(mc.sortedNames[p]) {
result = append(result, mc.sortedNames[p])
c.mu.Lock()
p := sort.SearchStrings(c.sortedNames, prefix)
for p < len(c.sortedNames) && strings.HasPrefix(c.sortedNames[p], prefix) {
if !isReservedName(c.sortedNames[p]) {
result = append(result, c.sortedNames[p])
}
p++
}
mc.mu.Unlock()
c.mu.Unlock()
return result, nil
}
func (mc *metadataCache) GetBlock(name string) ([]byte, error) {
mc.mu.Lock()
cid := mc.nameToCacheID[name]
func (c *metadataCache) GetBlock(name string) ([]byte, error) {
c.mu.Lock()
cid := c.nameToCacheID[name]
if cid == "" {
mc.mu.Unlock()
c.mu.Unlock()
return nil, storage.ErrBlockNotFound
}
// see if the data is cached
if data, ok := mc.cachedData[cid]; ok {
mc.mu.Unlock()
if data, ok := c.cachedData[cid]; ok {
c.mu.Unlock()
return cloneBytes(data), nil
}
mc.mu.Unlock()
c.mu.Unlock()
// not cached, fetch from the storage
b, err := mc.st.GetBlock(MetadataBlockPrefix+name, 0, -1)
b, err := c.st.GetBlock(MetadataBlockPrefix+name, 0, -1)
if err != nil {
return nil, err
}
// now race to add to cache, does not matter who wins.
mc.mu.Lock()
mc.cachedData[cid] = b
mc.mu.Unlock()
c.mu.Lock()
c.cachedData[cid] = b
c.mu.Unlock()
return cloneBytes(b), nil
}
func (mc *metadataCache) PutBlock(name string, data []byte) error {
if err := mc.st.PutBlock(MetadataBlockPrefix+name, data); err != nil {
func (c *metadataCache) PutBlock(name string, data []byte) error {
if err := c.st.PutBlock(MetadataBlockPrefix+name, data); err != nil {
return err
}
b := make([]byte, 8)
io.ReadFull(rand.Reader, b)
mc.mu.Lock()
c.mu.Lock()
cid := fmt.Sprintf("%v-new-%x", name, b)
mc.nameToCacheID[name] = cid
p := sort.SearchStrings(mc.sortedNames, name)
if p >= len(mc.sortedNames) || mc.sortedNames[p] != name {
c.nameToCacheID[name] = cid
p := sort.SearchStrings(c.sortedNames, name)
if p >= len(c.sortedNames) || c.sortedNames[p] != name {
// Name not present, p is the index where p should be inserted.
mc.sortedNames = append(append(
append([]string(nil), mc.sortedNames[0:p]...),
c.sortedNames = append(append(
append([]string(nil), c.sortedNames[0:p]...),
name),
mc.sortedNames[p:]...)
c.sortedNames[p:]...)
}
mc.mu.Unlock()
c.mu.Unlock()
return nil
}
func (mc *metadataCache) DeleteBlock(name string) error {
mc.mu.Lock()
if cid := mc.nameToCacheID[name]; cid != "" {
delete(mc.nameToCacheID, name)
delete(mc.cachedData, cid)
func (c *metadataCache) DeleteBlock(name string) error {
c.mu.Lock()
if cid := c.nameToCacheID[name]; cid != "" {
delete(c.nameToCacheID, name)
delete(c.cachedData, cid)
// Delete from sortedNames
p := sort.SearchStrings(mc.sortedNames, name)
if p < len(mc.sortedNames) && mc.sortedNames[p] == name {
p := sort.SearchStrings(c.sortedNames, name)
if p < len(c.sortedNames) && c.sortedNames[p] == name {
// Found at index 'p' build a new slice from [0..p), [p+1,...)
newSlice := mc.sortedNames[0:p]
for _, n := range mc.sortedNames[p+1:] {
newSlice := c.sortedNames[0:p]
for _, n := range c.sortedNames[p+1:] {
newSlice = append(newSlice, n)
}
mc.sortedNames = newSlice
c.sortedNames = newSlice
}
}
mc.mu.Unlock()
c.mu.Unlock()
return mc.st.DeleteBlock(MetadataBlockPrefix + name)
return c.st.DeleteBlock(MetadataBlockPrefix + name)
}
// refresh refreshes the list of blocks in the cache, but does not load or expire previously cached.
func (mc *metadataCache) refresh() error {
func (c *metadataCache) refresh() error {
var sortedNames []string
nameToCacheID := map[string]string{}
ch, cancel := mc.st.ListBlocks(MetadataBlockPrefix)
ch, cancel := c.st.ListBlocks(MetadataBlockPrefix)
defer cancel()
for it := range ch {
if it.Error != nil {
@@ -133,15 +133,15 @@ func (mc *metadataCache) refresh() error {
nameToCacheID[n] = fmt.Sprintf("%v-%v-%v", it.BlockID, it.Length, it.TimeStamp.UnixNano())
}
mc.setLoaded(sortedNames, nameToCacheID)
c.setLoaded(sortedNames, nameToCacheID)
return nil
}
func (mc *metadataCache) setLoaded(sortedNames []string, nameToCacheID map[string]string) {
mc.mu.Lock()
mc.sortedNames = sortedNames
mc.nameToCacheID = nameToCacheID
mc.mu.Unlock()
func (c *metadataCache) setLoaded(sortedNames []string, nameToCacheID map[string]string) {
c.mu.Lock()
c.sortedNames = sortedNames
c.nameToCacheID = nameToCacheID
c.mu.Unlock()
}
func cloneBytes(d []byte) []byte {

View File

@@ -62,24 +62,24 @@ type Manager struct {
}
// Put saves the specified metadata content under a provided name.
func (mm *Manager) Put(itemID string, content []byte) error {
func (m *Manager) Put(itemID string, content []byte) error {
if err := checkReservedName(itemID); err != nil {
return err
}
return mm.writeEncryptedBlock(itemID, content)
return m.writeEncryptedBlock(itemID, content)
}
// RefreshCache refreshes the cache of metadata items.
func (mm *Manager) RefreshCache() error {
return mm.cache.refresh()
func (m *Manager) RefreshCache() error {
return m.cache.refresh()
}
func (mm *Manager) writeEncryptedBlock(itemID string, content []byte) error {
if mm.aead != nil {
nonceLength := mm.aead.NonceSize()
func (m *Manager) writeEncryptedBlock(itemID string, content []byte) error {
if m.aead != nil {
nonceLength := m.aead.NonceSize()
noncePlusContentLength := nonceLength + len(content)
cipherText := make([]byte, noncePlusContentLength+mm.aead.Overhead())
cipherText := make([]byte, noncePlusContentLength+m.aead.Overhead())
// Store nonce at the beginning of ciphertext.
nonce := cipherText[0:nonceLength]
@@ -87,16 +87,16 @@ func (mm *Manager) writeEncryptedBlock(itemID string, content []byte) error {
return err
}
b := mm.aead.Seal(cipherText[nonceLength:nonceLength], nonce, content, mm.authData)
b := m.aead.Seal(cipherText[nonceLength:nonceLength], nonce, content, m.authData)
content = nonce[0 : nonceLength+len(b)]
}
return mm.cache.PutBlock(itemID, content)
return m.cache.PutBlock(itemID, content)
}
func (mm *Manager) readEncryptedBlock(itemID string) ([]byte, error) {
content, err := mm.cache.GetBlock(itemID)
func (m *Manager) readEncryptedBlock(itemID string) ([]byte, error) {
content, err := m.cache.GetBlock(itemID)
if err != nil {
if err == storage.ErrBlockNotFound {
return nil, ErrNotFound
@@ -104,31 +104,31 @@ func (mm *Manager) readEncryptedBlock(itemID string) ([]byte, error) {
return nil, fmt.Errorf("unexpected error reading %v: %v", itemID, err)
}
return mm.decryptBlock(content)
return m.decryptBlock(content)
}
func (mm *Manager) decryptBlock(content []byte) ([]byte, error) {
if mm.aead != nil {
nonce := content[0:mm.aead.NonceSize()]
payload := content[mm.aead.NonceSize():]
return mm.aead.Open(payload[:0], nonce, payload, mm.authData)
func (m *Manager) decryptBlock(content []byte) ([]byte, error) {
if m.aead != nil {
nonce := content[0:m.aead.NonceSize()]
payload := content[m.aead.NonceSize():]
return m.aead.Open(payload[:0], nonce, payload, m.authData)
}
return content, nil
}
// GetMetadata returns the contents of a specified metadata item.
func (mm *Manager) GetMetadata(itemID string) ([]byte, error) {
func (m *Manager) GetMetadata(itemID string) ([]byte, error) {
if err := checkReservedName(itemID); err != nil {
return nil, err
}
return mm.readEncryptedBlock(itemID)
return m.readEncryptedBlock(itemID)
}
// MultiGet gets the contents of a specified multiple metadata items efficiently.
// The results are returned as a map, with items that are not found not present in the map.
func (mm *Manager) MultiGet(itemIDs []string) (map[string][]byte, error) {
func (m *Manager) MultiGet(itemIDs []string) (map[string][]byte, error) {
type singleReadResult struct {
id string
contents []byte
@@ -140,7 +140,7 @@ type singleReadResult struct {
for i := 0; i < parallelFetches; i++ {
go func() {
for itemID := range inputs {
v, err := mm.GetMetadata(itemID)
v, err := m.GetMetadata(itemID)
ch <- singleReadResult{itemID, v, err}
}
}()
@@ -174,8 +174,8 @@ type singleReadResult struct {
}
// GetJSON reads and parses given item as JSON.
func (mm *Manager) GetJSON(itemID string, content interface{}) error {
j, err := mm.readEncryptedBlock(itemID)
func (m *Manager) GetJSON(itemID string, content interface{}) error {
j, err := m.readEncryptedBlock(itemID)
if err != nil {
return err
}
@@ -184,41 +184,41 @@ func (mm *Manager) GetJSON(itemID string, content interface{}) error {
}
// PutJSON stores the contents of an item stored with a given ID.
func (mm *Manager) PutJSON(id string, content interface{}) error {
func (m *Manager) PutJSON(id string, content interface{}) error {
j, err := json.Marshal(content)
if err != nil {
return err
}
return mm.writeEncryptedBlock(id, j)
return m.writeEncryptedBlock(id, j)
}
// List returns the list of metadata items matching the specified prefix.
func (mm *Manager) List(prefix string) ([]string, error) {
return mm.cache.ListBlocks(prefix)
func (m *Manager) List(prefix string) ([]string, error) {
return m.cache.ListBlocks(prefix)
}
// ListContents retrieves metadata contents for all items starting with a given prefix.
func (mm *Manager) ListContents(prefix string) (map[string][]byte, error) {
itemIDs, err := mm.List(prefix)
func (m *Manager) ListContents(prefix string) (map[string][]byte, error) {
itemIDs, err := m.List(prefix)
if err != nil {
return nil, err
}
return mm.MultiGet(itemIDs)
return m.MultiGet(itemIDs)
}
// Remove removes the specified metadata item.
func (mm *Manager) Remove(itemID string) error {
func (m *Manager) Remove(itemID string) error {
if err := checkReservedName(itemID); err != nil {
return err
}
return mm.cache.DeleteBlock(itemID)
return m.cache.DeleteBlock(itemID)
}
// RemoveMany efficiently removes multiple metadata items in parallel.
func (mm *Manager) RemoveMany(itemIDs []string) error {
func (m *Manager) RemoveMany(itemIDs []string) error {
parallelism := 30
ch := make(chan string)
var wg sync.WaitGroup
@@ -230,7 +230,7 @@ func (mm *Manager) RemoveMany(itemIDs []string) error {
defer wg.Done()
for id := range ch {
if err := mm.Remove(id); err != nil {
if err := m.Remove(id); err != nil {
errch <- err
}
}
@@ -254,32 +254,32 @@ func NewManager(st storage.Storage, f Format, km *auth.KeyManager) (*Manager, er
return nil, err
}
mm := &Manager{
m := &Manager{
Format: f,
storage: st,
cache: cache,
}
if err := mm.initCrypto(f, km); err != nil {
if err := m.initCrypto(f, km); err != nil {
return nil, fmt.Errorf("unable to initialize crypto: %v", err)
}
return mm, nil
return m, nil
}
func (mm *Manager) initCrypto(f Format, km *auth.KeyManager) error {
func (m *Manager) initCrypto(f Format, km *auth.KeyManager) error {
switch f.EncryptionAlgorithm {
case "NONE": // do nothing
return nil
case "AES256_GCM":
aesKey := km.DeriveKey(purposeAESKey, 32)
mm.authData = km.DeriveKey(purposeAuthData, 32)
m.authData = km.DeriveKey(purposeAuthData, 32)
blk, err := aes.NewCipher(aesKey)
if err != nil {
return fmt.Errorf("cannot create cipher: %v", err)
}
mm.aead, err = cipher.NewGCM(blk)
m.aead, err = cipher.NewGCM(blk)
if err != nil {
return fmt.Errorf("cannot create cipher: %v", err)
}