This commit is contained in:
Jarek Kowalski
2016-03-21 20:08:35 -07:00
commit d3a68a7055
15 changed files with 1237 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.DS_Store

8
Makefile Normal file
View File

@@ -0,0 +1,8 @@
build:
go install github.com/kopia/kopia/cmd/kopia
deps:
go get -u -t -v github.com/kopia/kopia/...
test:
go test -timeout 30s github.com/kopia/kopia/...

42
storage/config.go Normal file
View File

@@ -0,0 +1,42 @@
package storage
import (
"encoding/json"
)
// RepositoryConfiguration is a JSON-serializable description of Repository and its configuration.
type RepositoryConfiguration struct {
Type string
Config interface{}
}
// UnmarshalJSON parses the JSON-encoded data into RepositoryConfiguration.
func (c *RepositoryConfiguration) UnmarshalJSON(b []byte) error {
raw := struct {
Type string `json:"type"`
Data json.RawMessage `json:"config"`
}{}
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
c.Type = raw.Type
c.Config = factories[raw.Type].defaultConfigFunc()
if err := json.Unmarshal(raw.Data, c.Config); err != nil {
return err
}
return nil
}
// MarshalJSON returns JSON-encoded repository configuration.
func (c *RepositoryConfiguration) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
Type string `json:"type"`
Data interface{} `json:"config"`
}{
Type: c.Type,
Data: c.Config,
})
}

1
storage/constants.go Normal file
View File

@@ -0,0 +1 @@
package storage

2
storage/doc.go Normal file
View File

@@ -0,0 +1,2 @@
// Package storage implementats repositories for connecting to various types of storage backends.
package storage

16
storage/errors.go Normal file
View File

@@ -0,0 +1,16 @@
package storage
import "errors"
var (
// ErrBlockNotFound is returned when a block cannot be found in repository.
ErrBlockNotFound = errors.New("block not found")
// ErrInvalidChecksum is returned when a repository block is invalid, which may indicate
// that decryption has failed.
ErrInvalidChecksum = errors.New("invalid checksum")
// ErrWriteLimitExceeded is returned when the maximum amount of data has already been written
// to the repository.
ErrWriteLimitExceeded = errors.New("write limit exceeded")
)

260
storage/filesystem.go Normal file
View File

@@ -0,0 +1,260 @@
package storage
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
)
const (
fsRepositoryType = "fs"
fsRepositoryChunkSuffix = ".f"
)
var (
fsDefaultShards = []int{1, 3, 3}
fsDefaultFileMode os.FileMode = 0664
fsDefaultDirMode os.FileMode = 0775
)
type fsRepository struct {
FSRepositoryOptions
}
// FSRepositoryOptions defines options for Filesystem-backed repository.
type FSRepositoryOptions struct {
Path string `json:"path"`
DirectoryShards []int `json:"dirShards"`
FileMode os.FileMode `json:"fileMode"`
DirectoryMode os.FileMode `json:"dirMode"`
FileUID *int `json:"uid,omitempty"`
FileGID *int `json:"gid,omitempty"`
}
func (fs *fsRepository) BlockExists(blockID BlockID) (bool, error) {
_, path := fs.getShardedPathAndFilePath(blockID)
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
func (fs *fsRepository) GetBlock(blockID BlockID) ([]byte, error) {
_, path := fs.getShardedPathAndFilePath(blockID)
d, err := ioutil.ReadFile(path)
if err == nil {
return d, err
}
if os.IsNotExist(err) {
return nil, ErrBlockNotFound
}
return nil, err
}
func getBlockIDFromFileName(name string) (BlockID, bool) {
if strings.HasSuffix(name, fsRepositoryChunkSuffix) {
return BlockID(name[0 : len(name)-len(fsRepositoryChunkSuffix)]), true
}
return BlockID(""), false
}
func makeFileName(blockID BlockID) string {
return string(blockID) + fsRepositoryChunkSuffix
}
func (fs *fsRepository) ListBlocks(prefix BlockID) chan (BlockMetadata) {
result := make(chan (BlockMetadata))
prefixString := string(prefix)
var walkDir func(string, string)
walkDir = func(directory string, currentPrefix string) {
if entries, err := ioutil.ReadDir(directory); err == nil {
//log.Println("Walking", directory, "looking for", prefix)
for _, e := range entries {
if e.IsDir() {
newPrefix := currentPrefix + e.Name()
var match bool
if len(prefixString) > len(newPrefix) {
match = strings.HasPrefix(prefixString, newPrefix)
} else {
match = strings.HasPrefix(newPrefix, prefixString)
}
if match {
walkDir(directory+"/"+e.Name(), currentPrefix+e.Name())
}
} else if fullID, ok := getBlockIDFromFileName(currentPrefix + e.Name()); ok {
if strings.HasPrefix(string(fullID), prefixString) {
result <- BlockMetadata{
BlockID: BlockID(fullID),
Length: uint64(e.Size()),
TimeStamp: e.ModTime(),
}
}
}
}
}
}
walkDirAndClose := func(directory string) {
walkDir(directory, "")
close(result)
}
go walkDirAndClose(fs.Path)
return result
}
func (fs *fsRepository) PutBlock(blockID BlockID, data io.ReadCloser, options PutOptions) error {
// Close the data reader regardless of whether we use it or not.
defer data.Close()
shardPath, path := fs.getShardedPathAndFilePath(blockID)
// Open temporary file, create dir if required.
tempFile := fmt.Sprintf("%s.tmp.%d", path, rand.Int())
flags := os.O_CREATE | os.O_WRONLY | os.O_EXCL
f, err := os.OpenFile(tempFile, flags, fs.FileMode)
if os.IsNotExist(err) {
if err = os.MkdirAll(shardPath, fs.DirectoryMode); err != nil {
return fmt.Errorf("cannot create directory: %v", err)
}
f, err = os.OpenFile(tempFile, flags, fs.FileMode)
}
if err != nil {
return fmt.Errorf("cannot create temporary file: %v", err)
}
// Copy data to the temporary file.
io.Copy(f, data)
f.Close()
err = os.Rename(tempFile, path)
if err != nil {
os.Remove(tempFile)
return err
}
if fs.FileUID != nil && fs.FileGID != nil && os.Geteuid() == 0 {
os.Chown(path, *fs.FileUID, *fs.FileGID)
}
return nil
}
func (fs *fsRepository) DeleteBlock(blockID BlockID) error {
_, path := fs.getShardedPathAndFilePath(blockID)
err := os.Remove(path)
if err == nil || os.IsNotExist(err) {
return nil
}
return err
}
func (fs *fsRepository) Flush() error {
return nil
}
func (fs *fsRepository) getShardDirectory(blockID BlockID) (string, BlockID) {
shardPath := fs.Path
blockIDString := string(blockID)
if len(blockIDString) < 20 {
return shardPath, blockID
}
for _, size := range fs.DirectoryShards {
shardPath = filepath.Join(shardPath, blockIDString[0:size])
blockIDString = blockIDString[size:]
}
return shardPath, BlockID(blockIDString)
}
func (fs *fsRepository) getShardedPathAndFilePath(blockID BlockID) (string, string) {
shardPath, blockID := fs.getShardDirectory(blockID)
result := filepath.Join(shardPath, makeFileName(blockID))
return shardPath, result
}
func parseShardString(shardString string) ([]int, error) {
if shardString == "" {
// By default Xabcdefghijklmnop is stored in 'X/abc/def/Xabcdefghijklmnop'
return fsDefaultShards, nil
}
result := make([]int, 0, 4)
for _, value := range strings.Split(shardString, ",") {
shardLength, err := strconv.ParseInt(value, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid shard specification: '%s'", value)
}
result = append(result, int(shardLength))
}
return result, nil
}
func (fs *fsRepository) Configuration() RepositoryConfiguration {
return RepositoryConfiguration{
fsRepositoryType,
&fs.FSRepositoryOptions,
}
}
// NewFSRepository creates new fs-backed repository in a specified directory.
func NewFSRepository(options *FSRepositoryOptions) (Repository, error) {
var err error
if _, err = os.Stat(options.Path); err != nil {
return nil, fmt.Errorf("cannot access repository path: %v", err)
}
r := &fsRepository{
FSRepositoryOptions: *options,
}
if r.DirectoryShards == nil {
r.DirectoryShards = fsDefaultShards
}
if r.DirectoryMode == 0 {
r.DirectoryMode = fsDefaultDirMode
}
if r.FileMode == 0 {
r.FileMode = fsDefaultFileMode
}
return r, nil
}
func init() {
AddSupportedRepository(
fsRepositoryType,
func() interface{} { return &FSRepositoryOptions{} },
func(cfg interface{}) (Repository, error) {
return NewFSRepository(cfg.(*FSRepositoryOptions))
})
}

384
storage/gcs.go Normal file
View File

@@ -0,0 +1,384 @@
package storage
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"time"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
gcsclient "google.golang.org/api/storage/v1"
)
const (
gcsRepositoryType = "gcs"
gcsTokenCacheDir = ".kopia"
// Those are not really secret, since the app is installed.
googleCloudClientID = "194841383482-nmn10h4mnllnsvou7qr55tfh5jsmtkap.apps.googleusercontent.com"
googleCloudClientSecret = "ZL52E96Q7iRCD9YXVA7U6UaI"
)
// GCSRepositoryOptions defines options Google Cloud Storage-backed repository.
type GCSRepositoryOptions struct {
// BucketName is the name of the GCS bucket where data is stored.
BucketName string `json:"bucket"`
// Prefix specifies additional string to prepend to all objects.
Prefix string `json:"prefix,omitempty"`
// TokenCacheFile is the name of the file that will persist the OAuth2 token.
// If not specified, the token will be persisted in GCSRepositoryOptions.
TokenCacheFile string `json:"tokenCacheFile,omitempty"`
// Token stored the OAuth2 token (when TokenCacheFile is empty)
Token *oauth2.Token `json:"token,omitempty"`
// ReadOnly causes the repository to be configured without write permissions, to prevent accidental
// modifications to the data.
ReadOnly bool `json:"readonly"`
// IgnoreDefaultCredentials disables the use of credentials managed by Google Cloud SDK (gcloud).
IgnoreDefaultCredentials bool `json:"ignoreDefaultCredentials"`
}
type gcsRepository struct {
GCSRepositoryOptions
objectsService *gcsclient.ObjectsService
}
func (gcs *gcsRepository) BlockExists(b BlockID) (bool, error) {
_, err := gcs.objectsService.Get(gcs.BucketName, gcs.getObjectNameString(b)).Do()
if err == nil {
return true, nil
}
return false, err
}
func (gcs *gcsRepository) GetBlock(b BlockID) ([]byte, error) {
v, err := gcs.objectsService.Get(gcs.BucketName, gcs.getObjectNameString(b)).Download()
if err != nil {
if err, ok := err.(*googleapi.Error); ok {
if err.Code == 404 {
return nil, ErrBlockNotFound
}
}
return nil, fmt.Errorf("unable to get block '%s': %v", b, err)
}
defer v.Body.Close()
return ioutil.ReadAll(v.Body)
}
func (gcs *gcsRepository) PutBlock(b BlockID, data io.ReadCloser, options PutOptions) error {
object := gcsclient.Object{
Name: gcs.getObjectNameString(b),
}
defer data.Close()
_, err := gcs.objectsService.Insert(gcs.BucketName, &object).
IfGenerationMatch(0).
Media(data).
Do()
return err
}
func (gcs *gcsRepository) DeleteBlock(b BlockID) error {
err := gcs.objectsService.Delete(gcs.BucketName, string(b)).Do()
if err != nil {
return fmt.Errorf("unable to delete block %s: %v", b, err)
}
return nil
}
func (gcs *gcsRepository) getObjectNameString(b BlockID) string {
return gcs.Prefix + string(b)
}
func (gcs *gcsRepository) ListBlocks(prefix BlockID) chan (BlockMetadata) {
ch := make(chan BlockMetadata, 100)
go func() {
ps := gcs.getObjectNameString(prefix)
page, _ := gcs.objectsService.List(gcs.BucketName).
Prefix(ps).Do()
for {
for _, o := range page.Items {
t, e := time.Parse(time.RFC3339, o.TimeCreated)
if e != nil {
ch <- BlockMetadata{
Error: e,
}
} else {
ch <- BlockMetadata{
BlockID: BlockID(o.Name)[len(gcs.Prefix):],
Length: o.Size,
TimeStamp: t,
}
}
}
if page.NextPageToken != "" {
page, _ = gcs.objectsService.List(gcs.BucketName).
PageToken(ps).
Prefix(gcs.getObjectNameString(prefix)).Do()
} else {
break
}
}
close(ch)
}()
return ch
}
func (gcs *gcsRepository) Flush() error {
return nil
}
func (gcs *gcsRepository) Configuration() RepositoryConfiguration {
return RepositoryConfiguration{
gcsRepositoryType,
&gcs.GCSRepositoryOptions,
}
}
func tokenFromFile(file string) (*oauth2.Token, error) {
f, err := os.Open(file)
if err != nil {
return nil, err
}
t := oauth2.Token{}
err = json.NewDecoder(f).Decode(&t)
return &t, err
}
func saveToken(file string, token *oauth2.Token) {
f, err := os.Create(file)
if err != nil {
log.Printf("Warning: failed to cache oauth token: %v", err)
return
}
defer f.Close()
json.NewEncoder(f).Encode(token)
}
// NewGCSRepository creates new Google Cloud Storage-backed repository with specified options:
//
// - the 'BucketName' field is required and all other parameters are optional.
//
// By default the connection reuses credentials managed by (https://cloud.google.com/sdk/),
// but this can be disabled by setting IgnoreDefaultCredentials to true.
func NewGCSRepository(options *GCSRepositoryOptions) (Repository, error) {
ctx := context.TODO()
gcs := &gcsRepository{
GCSRepositoryOptions: *options,
}
if gcs.BucketName == "" {
return nil, errors.New("bucket name must be specified")
}
var scope string
if options.ReadOnly {
scope = gcsclient.DevstorageReadOnlyScope
} else {
scope = gcsclient.DevstorageReadWriteScope
}
// Try to get default client if possible and not disabled by options.
var client *http.Client
var err error
if !gcs.IgnoreDefaultCredentials {
client, _ = google.DefaultClient(context.TODO(), scope)
}
if client == nil {
// Fall back to asking user to authenticate.
config := &oauth2.Config{
ClientID: googleCloudClientID,
ClientSecret: googleCloudClientSecret,
Endpoint: google.Endpoint,
Scopes: []string{scope},
}
var token *oauth2.Token
if gcs.Token != nil {
// Token was provided, use it.
token = gcs.Token
} else {
if gcs.TokenCacheFile == "" {
// Cache file not provided, token will be saved in repository configuration.
token, err = tokenFromWeb(ctx, config)
if err != nil {
return nil, fmt.Errorf("cannot retrieve OAuth2 token: %v", err)
}
gcs.Token = token
} else {
token, err = tokenFromFile(gcs.TokenCacheFile)
if err != nil {
token, err = tokenFromWeb(ctx, config)
if err != nil {
return nil, fmt.Errorf("cannot retrieve OAuth2 token: %v", err)
}
}
saveToken(gcs.TokenCacheFile, token)
}
}
client = config.Client(ctx, token)
}
svc, err := gcsclient.New(client)
if err != nil {
return nil, fmt.Errorf("Unable to create GCS client: %v", err)
}
gcs.objectsService = svc.Objects
return gcs, nil
}
func readGcsTokenFromFile(filePath string) (*oauth2.Token, error) {
f, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer f.Close()
token := &oauth2.Token{}
err = json.NewDecoder(f).Decode(token)
if err != nil {
return nil, fmt.Errorf("Unable to decode token: %v", err)
}
return token, err
}
func writeTokenToFile(filePath string, token *oauth2.Token) error {
f, err := os.Create(filePath)
if err != nil {
return err
}
defer f.Close()
json.NewEncoder(f).Encode(*token)
return nil
}
func tokenFromWeb(ctx context.Context, config *oauth2.Config) (*oauth2.Token, error) {
ch := make(chan string)
randState := fmt.Sprintf("st%d", time.Now().UnixNano())
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/favicon.ico" {
http.Error(rw, "", 404)
return
}
if req.FormValue("state") != randState {
log.Printf("State doesn't match: req = %#v", req)
http.Error(rw, "", 500)
return
}
if code := req.FormValue("code"); code != "" {
fmt.Fprintf(rw, "<h1>Success</h1>Authorized.")
rw.(http.Flusher).Flush()
ch <- code
return
}
log.Printf("no code")
http.Error(rw, "", 500)
}))
defer ts.Close()
config.RedirectURL = ts.URL
authURL := config.AuthCodeURL(randState)
go openURL(authURL)
log.Printf("Authorize this app at: %s", authURL)
code := <-ch
log.Printf("Got code: %s", code)
token, err := config.Exchange(ctx, code)
if err != nil {
return nil, fmt.Errorf("token exchange error: %v", err)
}
return token, nil
}
func openURL(url string) error {
try := []string{"xdg-open", "google-chrome", "open"}
for _, bin := range try {
err := exec.Command(bin, url).Run()
if err == nil {
return nil
}
}
log.Printf("Error opening URL in browser.")
return fmt.Errorf("Error opening URL in browser")
}
func authPrompt(url string, state string) (authenticationCode string, err error) {
ch := make(chan string)
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/favicon.ico" {
http.Error(rw, "", 404)
return
}
if req.FormValue("state") != state {
log.Printf("State doesn't match: req = %#v", req)
http.Error(rw, "", 500)
return
}
if code := req.FormValue("code"); code != "" {
fmt.Fprintf(rw, "<h1>Success</h1>Authorized.")
rw.(http.Flusher).Flush()
ch <- code
return
}
log.Printf("no code")
http.Error(rw, "", 500)
}))
defer ts.Close()
log.Println("Go to", url)
var code string
n, err := fmt.Scanf("%s", &code)
if n == 1 {
return code, nil
}
return "", err
}
func init() {
AddSupportedRepository(
gcsRepositoryType,
func() interface{} {
return &GCSRepositoryOptions{}
},
func(cfg interface{}) (Repository, error) {
return NewGCSRepository(cfg.(*GCSRepositoryOptions))
})
}

45
storage/limit.go Normal file
View File

@@ -0,0 +1,45 @@
package storage
import (
"io"
"sync/atomic"
)
type writeLimitRepository struct {
Repository
remainingBytes int64
}
type writeLimitReadCloser struct {
io.ReadCloser
repo *writeLimitRepository
}
func (s *writeLimitReadCloser) Read(b []byte) (int, error) {
n, err := s.ReadCloser.Read(b)
atomic.AddInt64(&s.repo.remainingBytes, int64(-n))
return n, err
}
func (s *writeLimitRepository) PutBlock(id BlockID, data io.ReadCloser, options PutOptions) error {
if !options.IgnoreLimits {
if atomic.LoadInt64(&s.remainingBytes) <= 0 {
return ErrWriteLimitExceeded
}
}
return s.Repository.PutBlock(id, &writeLimitReadCloser{
ReadCloser: data,
repo: s,
}, options)
}
// NewWriteLimitWrapper returns a Repository wrapper that limits the number of bytes written to a cas.
// Once reached, the writes will return ErrWriteLimitExceeded
func NewWriteLimitWrapper(wrapped Repository, bytes int64) Repository {
return &writeLimitRepository{
Repository: wrapped,
remainingBytes: bytes,
}
}

53
storage/logging.go Normal file
View File

@@ -0,0 +1,53 @@
package storage
import (
"io"
"log"
)
type loggingRepository struct {
Repository
}
func (s *loggingRepository) BlockExists(id BlockID) (bool, error) {
result, err := s.Repository.BlockExists(id)
log.Printf("BlockExists(%#v)=%#v,%#v", id, result, err)
return result, err
}
func (s *loggingRepository) GetBlock(id BlockID) ([]byte, error) {
result, err := s.Repository.GetBlock(id)
if len(result) < 20 {
log.Printf("GetBlock(%#v)=(%#v, %#v)", id, result, err)
} else {
log.Printf("GetBlock(%#v)=({%#v bytes}, %#v)", id, len(result), err)
}
return result, err
}
func (s *loggingRepository) PutBlock(id BlockID, data io.ReadCloser, options PutOptions) error {
err := s.Repository.PutBlock(id, data, options)
log.Printf("PutBlock(%#v, %#v)=%#v", id, options, err)
return err
}
func (s *loggingRepository) DeleteBlock(id BlockID) error {
err := s.Repository.DeleteBlock(id)
log.Printf("DeleteBlock(%#v)=%#v", id, err)
return err
}
func (s *loggingRepository) ListBlocks(prefix BlockID) chan (BlockMetadata) {
log.Printf("ListBlocks(%#v)", prefix)
return s.Repository.ListBlocks(prefix)
}
func (s *loggingRepository) Flush() error {
log.Printf("Flush()")
return s.Repository.Flush()
}
// NewLoggingWrapper returns a Repository wrapper that logs all repository commands.
func NewLoggingWrapper(wrapped Repository) Repository {
return &loggingRepository{wrapped}
}

99
storage/map.go Normal file
View File

@@ -0,0 +1,99 @@
package storage
import (
"io"
"io/ioutil"
"sort"
"strings"
"sync"
"time"
)
type mapRepository struct {
data map[string][]byte
mutex sync.RWMutex
}
func (s *mapRepository) Configuration() RepositoryConfiguration {
return RepositoryConfiguration{}
}
func (s *mapRepository) BlockExists(id BlockID) (bool, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
_, ok := s.data[string(id)]
return ok, nil
}
func (s *mapRepository) GetBlock(id BlockID) ([]byte, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
data, ok := s.data[string(id)]
if ok {
return data, nil
}
return nil, ErrBlockNotFound
}
func (s *mapRepository) PutBlock(id BlockID, data io.ReadCloser, options PutOptions) error {
s.mutex.Lock()
defer s.mutex.Unlock()
c, err := ioutil.ReadAll(data)
data.Close()
if err != nil {
return err
}
s.data[string(id)] = c
return nil
}
func (s *mapRepository) DeleteBlock(id BlockID) error {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.data, string(id))
return nil
}
func (s *mapRepository) ListBlocks(prefix BlockID) chan (BlockMetadata) {
ch := make(chan (BlockMetadata))
fixedTime := time.Now()
go func() {
s.mutex.RLock()
defer s.mutex.RUnlock()
keys := []string{}
for k := range s.data {
if strings.HasPrefix(k, string(prefix)) {
keys = append(keys, k)
}
}
sort.Strings(keys)
for _, k := range keys {
v := s.data[k]
ch <- BlockMetadata{
BlockID: BlockID(k),
Length: uint64(len(v)),
TimeStamp: fixedTime,
}
}
close(ch)
}()
return ch
}
func (s *mapRepository) Flush() error {
return nil
}
// NewMapRepository returns an implementation of Repository backed by the contents of given map.
// Used primarily for testing.
func NewMapRepository(data map[string][]byte) Repository {
return &mapRepository{data: data}
}

35
storage/registry.go Normal file
View File

@@ -0,0 +1,35 @@
package storage
import "fmt"
var (
factories = map[string]repositoryFactory{}
)
// RepositoryFactory allows creation of repositories in a generic way.
type repositoryFactory struct {
defaultConfigFunc func() interface{}
createRepositoryFunc func(interface{}) (Repository, error)
}
// AddSupportedRepository registers factory function to create repository with a given type name.
func AddSupportedRepository(
repositoryType string,
defaultConfigFunc func() interface{},
createRepositoryFunc func(interface{}) (Repository, error)) {
factories[repositoryType] = repositoryFactory{
defaultConfigFunc: defaultConfigFunc,
createRepositoryFunc: createRepositoryFunc,
}
}
// NewRepository creates new repository based on RepositoryConfiguration.
// The repository type must be previously registered using AddSupportedRepository.
func NewRepository(cfg RepositoryConfiguration) (Repository, error) {
if factory, ok := factories[cfg.Type]; ok {
return factory.createRepositoryFunc(cfg.Config)
}
return nil, fmt.Errorf("unknown repository type: %s", cfg.Type)
}

45
storage/repository.go Normal file
View File

@@ -0,0 +1,45 @@
package storage
import (
"io"
"time"
)
// BlockID represents the identifier of a block stored in a Repository.
type BlockID string
// PutOptions controls the behavior of Repository.PutBlock()
type PutOptions struct {
Overwrite bool
IgnoreLimits bool
}
// Repository encapsulates storage for blocks of data.
type RepositoryWriter interface {
// BlockExists determines whether the specified block existts.
PutBlock(id BlockID, data io.ReadCloser, options PutOptions) error
DeleteBlock(id BlockID) error
Flush() error
}
// Repository encapsulates storage for blocks of data.
type RepositoryReader interface {
BlockExists(id BlockID) (bool, error)
GetBlock(id BlockID) ([]byte, error)
ListBlocks(prefix BlockID) chan (BlockMetadata)
}
type Repository interface {
RepositoryReader
RepositoryWriter
Configuration() RepositoryConfiguration
}
// BlockMetadata represents metadata about a single block in a repository.
// If Error field is set, no other field values should be assumed to be correct.
type BlockMetadata struct {
BlockID BlockID
Length uint64
TimeStamp time.Time
Error error
}

118
storage/repository_test.go Normal file
View File

@@ -0,0 +1,118 @@
package storage
import (
"bytes"
"io/ioutil"
"os"
"testing"
)
func TestLoggingRepository(t *testing.T) {
data := map[string][]byte{}
r := NewLoggingWrapper(NewMapRepository(data))
if r == nil {
t.Errorf("unexpected result: %v", r)
}
verifyRepository(t, r)
}
func TestMapRepository(t *testing.T) {
data := map[string][]byte{}
r := NewMapRepository(data)
if r == nil {
t.Errorf("unexpected result: %v", r)
}
verifyRepository(t, r)
}
func TestFileRepository(t *testing.T) {
// Test varioush shard configurations.
for _, shardSpec := range [][]int{
[]int{0},
[]int{1},
[]int{3, 3},
[]int{2},
[]int{1, 1},
[]int{1, 2},
[]int{2, 2, 2},
} {
path, _ := ioutil.TempDir("", "r-fs")
defer os.RemoveAll(path)
r, err := NewFSRepository(&FSRepositoryOptions{
Path: path,
DirectoryShards: shardSpec,
})
if r == nil || err != nil {
t.Errorf("unexpected result: %v %v", r, err)
}
verifyRepository(t, r)
}
}
func verifyRepository(t *testing.T, r Repository) {
blocks := []struct {
blk BlockID
contents []byte
}{
{blk: BlockID("abcdbbf4f0507d054ed5a80a5b65086f602b"), contents: []byte{}},
{blk: BlockID("zxce0e35630770c54668a8cfb4e414c6bf8f"), contents: []byte{1}},
{blk: BlockID("abff4585856ebf0748fd989e1dd623a8963d"), contents: bytes.Repeat([]byte{1}, 1000)},
{blk: BlockID("abgc3dca496d510f492c858a2df1eb824e62"), contents: bytes.Repeat([]byte{1}, 10000)},
}
// First verify that blocks don't exist.
for _, b := range blocks {
if x, err := r.BlockExists(b.blk); x || err != nil {
t.Errorf("block exists or error: %v %v", b.blk, err)
}
data, err := r.GetBlock(b.blk)
if err != ErrBlockNotFound {
t.Errorf("unexpected error when calling GetBlock(%v): %v", b.blk, err)
}
if data != nil {
t.Errorf("got data when calling GetBlock(%v): %v", b.blk, data)
}
}
// Now add blocks.
for _, b := range blocks {
r.PutBlock(b.blk, ioutil.NopCloser(bytes.NewBuffer(b.contents)), PutOptions{})
if x, err := r.BlockExists(b.blk); !x || err != nil {
t.Errorf("block does not exist after adding it: %v %v", b.blk, err)
}
data, err := r.GetBlock(b.blk)
if err != nil {
t.Errorf("unexpected error when calling GetBlock(%v) after adding: %v", b.blk, err)
}
if !bytes.Equal(data, b.contents) {
t.Errorf("got data when calling GetBlock(%v): %v", b.blk, data)
}
}
// List
ch := r.ListBlocks(BlockID("ab"))
e1, ok := <-ch
if !ok || e1.BlockID != blocks[0].blk {
t.Errorf("missing result 0")
}
e2, ok := <-ch
if !ok || e2.BlockID != blocks[2].blk {
t.Errorf("missing result 2")
}
e3, ok := <-ch
if !ok || e3.BlockID != blocks[3].blk {
t.Errorf("missing result 3")
}
e4, ok := <-ch
if ok {
t.Errorf("unexpected item: %v", e4)
}
if e1.TimeStamp.After(e2.TimeStamp) || e2.TimeStamp.After(e3.TimeStamp) {
t.Errorf("timings are not sorted: %v %v %v", e1.TimeStamp, e2.TimeStamp, e3.TimeStamp)
}
}

128
storage/writeback.go Normal file
View File

@@ -0,0 +1,128 @@
// Wrapper which implements asynchronous (write-back) PutBlock and DeleteBlock operation
// useful for slower backends (cloud).
package storage
import (
"fmt"
"io"
"sync"
"sync/atomic"
)
type writeBackRepository struct {
Repository
channel chan writeBackRequest
deferredError atomic.Value
workerCount int
}
type writeBackRequest struct {
action func() error
workerPaused *sync.WaitGroup
workerRelease *sync.WaitGroup
debugInfo string
}
func (wb *writeBackRepository) PutBlock(blockID BlockID, data io.ReadCloser, options PutOptions) error {
err := wb.getDeferredError()
if err != nil {
data.Close()
return err
}
wb.channel <- writeBackRequest{
action: func() error {
return wb.Repository.PutBlock(blockID, data, options)
},
debugInfo: fmt.Sprintf("Put(%s)", blockID),
}
return nil
}
func (wb *writeBackRepository) getDeferredError() error {
deferredError := wb.deferredError.Load()
if deferredError != nil {
return deferredError.(error)
}
return nil
}
func (wb *writeBackRepository) DeleteBlock(blockID BlockID) error {
wb.channel <- writeBackRequest{
action: func() error {
return wb.Repository.DeleteBlock(blockID)
},
debugInfo: fmt.Sprintf("Delete(%s)", blockID),
}
return nil
}
func (wb *writeBackRepository) Flush() error {
rwg := sync.WaitGroup{}
rwg.Add(1)
// Create a wait group that all workers will join.
wg := sync.WaitGroup{}
wg.Add(wb.workerCount)
// Send a request to all workers that causes them to report to the waitgroup.
for n := 0; n < wb.workerCount; n++ {
wb.channel <- writeBackRequest{
workerPaused: &wg,
workerRelease: &rwg,
}
}
// Wait until all workers join the wait group.
wg.Wait()
// Now release them all.
rwg.Done()
return wb.Repository.Flush()
}
func (wb *writeBackRepository) processRequest(req writeBackRequest) {
if req.workerPaused != nil {
req.workerPaused.Done()
req.workerRelease.Wait()
return
}
if wb.getDeferredError() != nil {
return
}
err := req.action()
if err != nil {
wb.deferredError.Store(err)
}
}
// NewWriteBackWrapper returns a Repository wrapper that processes writes asynchronously using the specified
// number of worker goroutines. This wrapper is best used with Repositories that exhibit high latency.
func NewWriteBackWrapper(wrapped Repository, workerCount int) Repository {
ch := make(chan writeBackRequest, workerCount)
result := &writeBackRepository{
Repository: wrapped,
channel: ch,
workerCount: workerCount,
}
for i := 0; i < workerCount; i++ {
go func(workerId int) {
for {
req, ok := <-ch
if !ok {
break
}
result.processRequest(req)
}
}(i)
}
return result
}