commit d3a68a7055bf7f7ab5979aeae8d45de00c2900a3 Author: Jarek Kowalski Date: Mon Mar 21 20:08:35 2016 -0700 initial diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..e43b0f988 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.DS_Store diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..699492bfd --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +build: + go install github.com/kopia/kopia/cmd/kopia + +deps: + go get -u -t -v github.com/kopia/kopia/... + +test: + go test -timeout 30s github.com/kopia/kopia/... diff --git a/storage/config.go b/storage/config.go new file mode 100644 index 000000000..64e523b44 --- /dev/null +++ b/storage/config.go @@ -0,0 +1,42 @@ +package storage + +import ( + "encoding/json" +) + +// RepositoryConfiguration is a JSON-serializable description of Repository and its configuration. +type RepositoryConfiguration struct { + Type string + Config interface{} +} + +// UnmarshalJSON parses the JSON-encoded data into RepositoryConfiguration. +func (c *RepositoryConfiguration) UnmarshalJSON(b []byte) error { + raw := struct { + Type string `json:"type"` + Data json.RawMessage `json:"config"` + }{} + + if err := json.Unmarshal(b, &raw); err != nil { + return err + } + + c.Type = raw.Type + c.Config = factories[raw.Type].defaultConfigFunc() + if err := json.Unmarshal(raw.Data, c.Config); err != nil { + return err + } + + return nil +} + +// MarshalJSON returns JSON-encoded repository configuration. +func (c *RepositoryConfiguration) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Type string `json:"type"` + Data interface{} `json:"config"` + }{ + Type: c.Type, + Data: c.Config, + }) +} diff --git a/storage/constants.go b/storage/constants.go new file mode 100644 index 000000000..82be0547e --- /dev/null +++ b/storage/constants.go @@ -0,0 +1 @@ +package storage diff --git a/storage/doc.go b/storage/doc.go new file mode 100644 index 000000000..73b464409 --- /dev/null +++ b/storage/doc.go @@ -0,0 +1,2 @@ +// Package storage implementats repositories for connecting to various types of storage backends. +package storage diff --git a/storage/errors.go b/storage/errors.go new file mode 100644 index 000000000..c1800b63a --- /dev/null +++ b/storage/errors.go @@ -0,0 +1,16 @@ +package storage + +import "errors" + +var ( + // ErrBlockNotFound is returned when a block cannot be found in repository. + ErrBlockNotFound = errors.New("block not found") + + // ErrInvalidChecksum is returned when a repository block is invalid, which may indicate + // that decryption has failed. + ErrInvalidChecksum = errors.New("invalid checksum") + + // ErrWriteLimitExceeded is returned when the maximum amount of data has already been written + // to the repository. + ErrWriteLimitExceeded = errors.New("write limit exceeded") +) diff --git a/storage/filesystem.go b/storage/filesystem.go new file mode 100644 index 000000000..ca0e369b6 --- /dev/null +++ b/storage/filesystem.go @@ -0,0 +1,260 @@ +package storage + +import ( + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "strconv" + "strings" +) + +const ( + fsRepositoryType = "fs" + + fsRepositoryChunkSuffix = ".f" +) + +var ( + fsDefaultShards = []int{1, 3, 3} + fsDefaultFileMode os.FileMode = 0664 + fsDefaultDirMode os.FileMode = 0775 +) + +type fsRepository struct { + FSRepositoryOptions +} + +// FSRepositoryOptions defines options for Filesystem-backed repository. +type FSRepositoryOptions struct { + Path string `json:"path"` + + DirectoryShards []int `json:"dirShards"` + + FileMode os.FileMode `json:"fileMode"` + DirectoryMode os.FileMode `json:"dirMode"` + + FileUID *int `json:"uid,omitempty"` + FileGID *int `json:"gid,omitempty"` +} + +func (fs *fsRepository) BlockExists(blockID BlockID) (bool, error) { + _, path := fs.getShardedPathAndFilePath(blockID) + _, err := os.Stat(path) + if err == nil { + return true, nil + } + + if os.IsNotExist(err) { + return false, nil + } + + return false, err +} + +func (fs *fsRepository) GetBlock(blockID BlockID) ([]byte, error) { + _, path := fs.getShardedPathAndFilePath(blockID) + d, err := ioutil.ReadFile(path) + if err == nil { + return d, err + } + + if os.IsNotExist(err) { + return nil, ErrBlockNotFound + } + + return nil, err +} + +func getBlockIDFromFileName(name string) (BlockID, bool) { + if strings.HasSuffix(name, fsRepositoryChunkSuffix) { + return BlockID(name[0 : len(name)-len(fsRepositoryChunkSuffix)]), true + } + + return BlockID(""), false +} + +func makeFileName(blockID BlockID) string { + return string(blockID) + fsRepositoryChunkSuffix +} + +func (fs *fsRepository) ListBlocks(prefix BlockID) chan (BlockMetadata) { + result := make(chan (BlockMetadata)) + + prefixString := string(prefix) + + var walkDir func(string, string) + + walkDir = func(directory string, currentPrefix string) { + if entries, err := ioutil.ReadDir(directory); err == nil { + //log.Println("Walking", directory, "looking for", prefix) + + for _, e := range entries { + if e.IsDir() { + newPrefix := currentPrefix + e.Name() + var match bool + + if len(prefixString) > len(newPrefix) { + match = strings.HasPrefix(prefixString, newPrefix) + } else { + match = strings.HasPrefix(newPrefix, prefixString) + } + + if match { + walkDir(directory+"/"+e.Name(), currentPrefix+e.Name()) + } + } else if fullID, ok := getBlockIDFromFileName(currentPrefix + e.Name()); ok { + if strings.HasPrefix(string(fullID), prefixString) { + result <- BlockMetadata{ + BlockID: BlockID(fullID), + Length: uint64(e.Size()), + TimeStamp: e.ModTime(), + } + } + } + } + } + } + + walkDirAndClose := func(directory string) { + walkDir(directory, "") + close(result) + } + + go walkDirAndClose(fs.Path) + return result +} + +func (fs *fsRepository) PutBlock(blockID BlockID, data io.ReadCloser, options PutOptions) error { + // Close the data reader regardless of whether we use it or not. + defer data.Close() + + shardPath, path := fs.getShardedPathAndFilePath(blockID) + + // Open temporary file, create dir if required. + tempFile := fmt.Sprintf("%s.tmp.%d", path, rand.Int()) + flags := os.O_CREATE | os.O_WRONLY | os.O_EXCL + f, err := os.OpenFile(tempFile, flags, fs.FileMode) + if os.IsNotExist(err) { + if err = os.MkdirAll(shardPath, fs.DirectoryMode); err != nil { + return fmt.Errorf("cannot create directory: %v", err) + } + f, err = os.OpenFile(tempFile, flags, fs.FileMode) + } + + if err != nil { + return fmt.Errorf("cannot create temporary file: %v", err) + } + + // Copy data to the temporary file. + io.Copy(f, data) + f.Close() + + err = os.Rename(tempFile, path) + if err != nil { + os.Remove(tempFile) + return err + } + + if fs.FileUID != nil && fs.FileGID != nil && os.Geteuid() == 0 { + os.Chown(path, *fs.FileUID, *fs.FileGID) + } + + return nil +} + +func (fs *fsRepository) DeleteBlock(blockID BlockID) error { + _, path := fs.getShardedPathAndFilePath(blockID) + err := os.Remove(path) + if err == nil || os.IsNotExist(err) { + return nil + } + + return err +} + +func (fs *fsRepository) Flush() error { + return nil +} + +func (fs *fsRepository) getShardDirectory(blockID BlockID) (string, BlockID) { + shardPath := fs.Path + blockIDString := string(blockID) + if len(blockIDString) < 20 { + return shardPath, blockID + } + for _, size := range fs.DirectoryShards { + shardPath = filepath.Join(shardPath, blockIDString[0:size]) + blockIDString = blockIDString[size:] + } + + return shardPath, BlockID(blockIDString) +} + +func (fs *fsRepository) getShardedPathAndFilePath(blockID BlockID) (string, string) { + shardPath, blockID := fs.getShardDirectory(blockID) + result := filepath.Join(shardPath, makeFileName(blockID)) + return shardPath, result +} + +func parseShardString(shardString string) ([]int, error) { + if shardString == "" { + // By default Xabcdefghijklmnop is stored in 'X/abc/def/Xabcdefghijklmnop' + return fsDefaultShards, nil + } + + result := make([]int, 0, 4) + for _, value := range strings.Split(shardString, ",") { + shardLength, err := strconv.ParseInt(value, 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid shard specification: '%s'", value) + } + result = append(result, int(shardLength)) + } + return result, nil +} + +func (fs *fsRepository) Configuration() RepositoryConfiguration { + return RepositoryConfiguration{ + fsRepositoryType, + &fs.FSRepositoryOptions, + } +} + +// NewFSRepository creates new fs-backed repository in a specified directory. +func NewFSRepository(options *FSRepositoryOptions) (Repository, error) { + var err error + + if _, err = os.Stat(options.Path); err != nil { + return nil, fmt.Errorf("cannot access repository path: %v", err) + } + + r := &fsRepository{ + FSRepositoryOptions: *options, + } + + if r.DirectoryShards == nil { + r.DirectoryShards = fsDefaultShards + } + + if r.DirectoryMode == 0 { + r.DirectoryMode = fsDefaultDirMode + } + + if r.FileMode == 0 { + r.FileMode = fsDefaultFileMode + } + + return r, nil +} + +func init() { + AddSupportedRepository( + fsRepositoryType, + func() interface{} { return &FSRepositoryOptions{} }, + func(cfg interface{}) (Repository, error) { + return NewFSRepository(cfg.(*FSRepositoryOptions)) + }) +} diff --git a/storage/gcs.go b/storage/gcs.go new file mode 100644 index 000000000..0b22a654a --- /dev/null +++ b/storage/gcs.go @@ -0,0 +1,384 @@ +package storage + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "time" + + "golang.org/x/net/context" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + + "google.golang.org/api/googleapi" + gcsclient "google.golang.org/api/storage/v1" +) + +const ( + gcsRepositoryType = "gcs" + gcsTokenCacheDir = ".kopia" + + // Those are not really secret, since the app is installed. + googleCloudClientID = "194841383482-nmn10h4mnllnsvou7qr55tfh5jsmtkap.apps.googleusercontent.com" + googleCloudClientSecret = "ZL52E96Q7iRCD9YXVA7U6UaI" +) + +// GCSRepositoryOptions defines options Google Cloud Storage-backed repository. +type GCSRepositoryOptions struct { + // BucketName is the name of the GCS bucket where data is stored. + BucketName string `json:"bucket"` + + // Prefix specifies additional string to prepend to all objects. + Prefix string `json:"prefix,omitempty"` + + // TokenCacheFile is the name of the file that will persist the OAuth2 token. + // If not specified, the token will be persisted in GCSRepositoryOptions. + TokenCacheFile string `json:"tokenCacheFile,omitempty"` + + // Token stored the OAuth2 token (when TokenCacheFile is empty) + Token *oauth2.Token `json:"token,omitempty"` + + // ReadOnly causes the repository to be configured without write permissions, to prevent accidental + // modifications to the data. + ReadOnly bool `json:"readonly"` + + // IgnoreDefaultCredentials disables the use of credentials managed by Google Cloud SDK (gcloud). + IgnoreDefaultCredentials bool `json:"ignoreDefaultCredentials"` +} + +type gcsRepository struct { + GCSRepositoryOptions + objectsService *gcsclient.ObjectsService +} + +func (gcs *gcsRepository) BlockExists(b BlockID) (bool, error) { + _, err := gcs.objectsService.Get(gcs.BucketName, gcs.getObjectNameString(b)).Do() + + if err == nil { + return true, nil + } + + return false, err +} + +func (gcs *gcsRepository) GetBlock(b BlockID) ([]byte, error) { + v, err := gcs.objectsService.Get(gcs.BucketName, gcs.getObjectNameString(b)).Download() + if err != nil { + if err, ok := err.(*googleapi.Error); ok { + if err.Code == 404 { + return nil, ErrBlockNotFound + } + } + + return nil, fmt.Errorf("unable to get block '%s': %v", b, err) + } + + defer v.Body.Close() + + return ioutil.ReadAll(v.Body) +} + +func (gcs *gcsRepository) PutBlock(b BlockID, data io.ReadCloser, options PutOptions) error { + object := gcsclient.Object{ + Name: gcs.getObjectNameString(b), + } + defer data.Close() + _, err := gcs.objectsService.Insert(gcs.BucketName, &object). + IfGenerationMatch(0). + Media(data). + Do() + + return err +} + +func (gcs *gcsRepository) DeleteBlock(b BlockID) error { + err := gcs.objectsService.Delete(gcs.BucketName, string(b)).Do() + if err != nil { + return fmt.Errorf("unable to delete block %s: %v", b, err) + } + + return nil +} + +func (gcs *gcsRepository) getObjectNameString(b BlockID) string { + return gcs.Prefix + string(b) +} + +func (gcs *gcsRepository) ListBlocks(prefix BlockID) chan (BlockMetadata) { + ch := make(chan BlockMetadata, 100) + + go func() { + ps := gcs.getObjectNameString(prefix) + page, _ := gcs.objectsService.List(gcs.BucketName). + Prefix(ps).Do() + for { + for _, o := range page.Items { + t, e := time.Parse(time.RFC3339, o.TimeCreated) + if e != nil { + ch <- BlockMetadata{ + Error: e, + } + } else { + ch <- BlockMetadata{ + BlockID: BlockID(o.Name)[len(gcs.Prefix):], + Length: o.Size, + TimeStamp: t, + } + } + } + + if page.NextPageToken != "" { + page, _ = gcs.objectsService.List(gcs.BucketName). + PageToken(ps). + Prefix(gcs.getObjectNameString(prefix)).Do() + } else { + break + } + } + close(ch) + }() + + return ch +} + +func (gcs *gcsRepository) Flush() error { + return nil +} + +func (gcs *gcsRepository) Configuration() RepositoryConfiguration { + return RepositoryConfiguration{ + gcsRepositoryType, + &gcs.GCSRepositoryOptions, + } +} + +func tokenFromFile(file string) (*oauth2.Token, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + + t := oauth2.Token{} + err = json.NewDecoder(f).Decode(&t) + return &t, err +} + +func saveToken(file string, token *oauth2.Token) { + f, err := os.Create(file) + if err != nil { + log.Printf("Warning: failed to cache oauth token: %v", err) + return + } + defer f.Close() + json.NewEncoder(f).Encode(token) +} + +// NewGCSRepository creates new Google Cloud Storage-backed repository with specified options: +// +// - the 'BucketName' field is required and all other parameters are optional. +// +// By default the connection reuses credentials managed by (https://cloud.google.com/sdk/), +// but this can be disabled by setting IgnoreDefaultCredentials to true. +func NewGCSRepository(options *GCSRepositoryOptions) (Repository, error) { + ctx := context.TODO() + + gcs := &gcsRepository{ + GCSRepositoryOptions: *options, + } + + if gcs.BucketName == "" { + return nil, errors.New("bucket name must be specified") + } + + var scope string + if options.ReadOnly { + scope = gcsclient.DevstorageReadOnlyScope + } else { + scope = gcsclient.DevstorageReadWriteScope + } + + // Try to get default client if possible and not disabled by options. + var client *http.Client + var err error + + if !gcs.IgnoreDefaultCredentials { + client, _ = google.DefaultClient(context.TODO(), scope) + } + + if client == nil { + // Fall back to asking user to authenticate. + config := &oauth2.Config{ + ClientID: googleCloudClientID, + ClientSecret: googleCloudClientSecret, + Endpoint: google.Endpoint, + Scopes: []string{scope}, + } + + var token *oauth2.Token + if gcs.Token != nil { + // Token was provided, use it. + token = gcs.Token + } else { + if gcs.TokenCacheFile == "" { + // Cache file not provided, token will be saved in repository configuration. + token, err = tokenFromWeb(ctx, config) + if err != nil { + return nil, fmt.Errorf("cannot retrieve OAuth2 token: %v", err) + } + gcs.Token = token + } else { + token, err = tokenFromFile(gcs.TokenCacheFile) + if err != nil { + token, err = tokenFromWeb(ctx, config) + if err != nil { + return nil, fmt.Errorf("cannot retrieve OAuth2 token: %v", err) + } + } + saveToken(gcs.TokenCacheFile, token) + } + } + client = config.Client(ctx, token) + } + + svc, err := gcsclient.New(client) + if err != nil { + return nil, fmt.Errorf("Unable to create GCS client: %v", err) + } + + gcs.objectsService = svc.Objects + + return gcs, nil + +} + +func readGcsTokenFromFile(filePath string) (*oauth2.Token, error) { + f, err := os.Open(filePath) + if err != nil { + return nil, err + } + + defer f.Close() + + token := &oauth2.Token{} + err = json.NewDecoder(f).Decode(token) + if err != nil { + return nil, fmt.Errorf("Unable to decode token: %v", err) + } + + return token, err +} + +func writeTokenToFile(filePath string, token *oauth2.Token) error { + f, err := os.Create(filePath) + if err != nil { + return err + } + defer f.Close() + + json.NewEncoder(f).Encode(*token) + return nil +} + +func tokenFromWeb(ctx context.Context, config *oauth2.Config) (*oauth2.Token, error) { + ch := make(chan string) + randState := fmt.Sprintf("st%d", time.Now().UnixNano()) + ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/favicon.ico" { + http.Error(rw, "", 404) + return + } + if req.FormValue("state") != randState { + log.Printf("State doesn't match: req = %#v", req) + http.Error(rw, "", 500) + return + } + if code := req.FormValue("code"); code != "" { + fmt.Fprintf(rw, "

Success

Authorized.") + rw.(http.Flusher).Flush() + ch <- code + return + } + log.Printf("no code") + http.Error(rw, "", 500) + })) + defer ts.Close() + + config.RedirectURL = ts.URL + authURL := config.AuthCodeURL(randState) + go openURL(authURL) + log.Printf("Authorize this app at: %s", authURL) + code := <-ch + log.Printf("Got code: %s", code) + + token, err := config.Exchange(ctx, code) + if err != nil { + return nil, fmt.Errorf("token exchange error: %v", err) + } + + return token, nil +} + +func openURL(url string) error { + try := []string{"xdg-open", "google-chrome", "open"} + for _, bin := range try { + err := exec.Command(bin, url).Run() + if err == nil { + return nil + } + } + log.Printf("Error opening URL in browser.") + return fmt.Errorf("Error opening URL in browser") +} + +func authPrompt(url string, state string) (authenticationCode string, err error) { + ch := make(chan string) + + ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/favicon.ico" { + http.Error(rw, "", 404) + return + } + if req.FormValue("state") != state { + log.Printf("State doesn't match: req = %#v", req) + http.Error(rw, "", 500) + return + } + if code := req.FormValue("code"); code != "" { + fmt.Fprintf(rw, "

Success

Authorized.") + rw.(http.Flusher).Flush() + ch <- code + return + } + log.Printf("no code") + http.Error(rw, "", 500) + })) + defer ts.Close() + + log.Println("Go to", url) + var code string + n, err := fmt.Scanf("%s", &code) + if n == 1 { + return code, nil + } + + return "", err + +} + +func init() { + AddSupportedRepository( + gcsRepositoryType, + func() interface{} { + return &GCSRepositoryOptions{} + }, + func(cfg interface{}) (Repository, error) { + return NewGCSRepository(cfg.(*GCSRepositoryOptions)) + }) +} diff --git a/storage/limit.go b/storage/limit.go new file mode 100644 index 000000000..61ca0794d --- /dev/null +++ b/storage/limit.go @@ -0,0 +1,45 @@ +package storage + +import ( + "io" + "sync/atomic" +) + +type writeLimitRepository struct { + Repository + + remainingBytes int64 +} + +type writeLimitReadCloser struct { + io.ReadCloser + repo *writeLimitRepository +} + +func (s *writeLimitReadCloser) Read(b []byte) (int, error) { + n, err := s.ReadCloser.Read(b) + atomic.AddInt64(&s.repo.remainingBytes, int64(-n)) + return n, err +} + +func (s *writeLimitRepository) PutBlock(id BlockID, data io.ReadCloser, options PutOptions) error { + if !options.IgnoreLimits { + if atomic.LoadInt64(&s.remainingBytes) <= 0 { + return ErrWriteLimitExceeded + } + } + + return s.Repository.PutBlock(id, &writeLimitReadCloser{ + ReadCloser: data, + repo: s, + }, options) +} + +// NewWriteLimitWrapper returns a Repository wrapper that limits the number of bytes written to a cas. +// Once reached, the writes will return ErrWriteLimitExceeded +func NewWriteLimitWrapper(wrapped Repository, bytes int64) Repository { + return &writeLimitRepository{ + Repository: wrapped, + remainingBytes: bytes, + } +} diff --git a/storage/logging.go b/storage/logging.go new file mode 100644 index 000000000..7b3f41538 --- /dev/null +++ b/storage/logging.go @@ -0,0 +1,53 @@ +package storage + +import ( + "io" + "log" +) + +type loggingRepository struct { + Repository +} + +func (s *loggingRepository) BlockExists(id BlockID) (bool, error) { + result, err := s.Repository.BlockExists(id) + log.Printf("BlockExists(%#v)=%#v,%#v", id, result, err) + return result, err +} + +func (s *loggingRepository) GetBlock(id BlockID) ([]byte, error) { + result, err := s.Repository.GetBlock(id) + if len(result) < 20 { + log.Printf("GetBlock(%#v)=(%#v, %#v)", id, result, err) + } else { + log.Printf("GetBlock(%#v)=({%#v bytes}, %#v)", id, len(result), err) + } + return result, err +} + +func (s *loggingRepository) PutBlock(id BlockID, data io.ReadCloser, options PutOptions) error { + err := s.Repository.PutBlock(id, data, options) + log.Printf("PutBlock(%#v, %#v)=%#v", id, options, err) + return err +} + +func (s *loggingRepository) DeleteBlock(id BlockID) error { + err := s.Repository.DeleteBlock(id) + log.Printf("DeleteBlock(%#v)=%#v", id, err) + return err +} + +func (s *loggingRepository) ListBlocks(prefix BlockID) chan (BlockMetadata) { + log.Printf("ListBlocks(%#v)", prefix) + return s.Repository.ListBlocks(prefix) +} + +func (s *loggingRepository) Flush() error { + log.Printf("Flush()") + return s.Repository.Flush() +} + +// NewLoggingWrapper returns a Repository wrapper that logs all repository commands. +func NewLoggingWrapper(wrapped Repository) Repository { + return &loggingRepository{wrapped} +} diff --git a/storage/map.go b/storage/map.go new file mode 100644 index 000000000..b0b84c7a1 --- /dev/null +++ b/storage/map.go @@ -0,0 +1,99 @@ +package storage + +import ( + "io" + "io/ioutil" + "sort" + "strings" + "sync" + "time" +) + +type mapRepository struct { + data map[string][]byte + mutex sync.RWMutex +} + +func (s *mapRepository) Configuration() RepositoryConfiguration { + return RepositoryConfiguration{} +} + +func (s *mapRepository) BlockExists(id BlockID) (bool, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + _, ok := s.data[string(id)] + return ok, nil +} + +func (s *mapRepository) GetBlock(id BlockID) ([]byte, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + data, ok := s.data[string(id)] + if ok { + return data, nil + } + + return nil, ErrBlockNotFound +} + +func (s *mapRepository) PutBlock(id BlockID, data io.ReadCloser, options PutOptions) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + c, err := ioutil.ReadAll(data) + data.Close() + if err != nil { + return err + } + + s.data[string(id)] = c + return nil +} + +func (s *mapRepository) DeleteBlock(id BlockID) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + delete(s.data, string(id)) + return nil +} + +func (s *mapRepository) ListBlocks(prefix BlockID) chan (BlockMetadata) { + ch := make(chan (BlockMetadata)) + fixedTime := time.Now() + go func() { + s.mutex.RLock() + defer s.mutex.RUnlock() + + keys := []string{} + for k := range s.data { + if strings.HasPrefix(k, string(prefix)) { + keys = append(keys, k) + } + } + + sort.Strings(keys) + + for _, k := range keys { + v := s.data[k] + ch <- BlockMetadata{ + BlockID: BlockID(k), + Length: uint64(len(v)), + TimeStamp: fixedTime, + } + } + close(ch) + }() + return ch +} + +func (s *mapRepository) Flush() error { + return nil +} + +// NewMapRepository returns an implementation of Repository backed by the contents of given map. +// Used primarily for testing. +func NewMapRepository(data map[string][]byte) Repository { + return &mapRepository{data: data} +} diff --git a/storage/registry.go b/storage/registry.go new file mode 100644 index 000000000..0387d80ee --- /dev/null +++ b/storage/registry.go @@ -0,0 +1,35 @@ +package storage + +import "fmt" + +var ( + factories = map[string]repositoryFactory{} +) + +// RepositoryFactory allows creation of repositories in a generic way. +type repositoryFactory struct { + defaultConfigFunc func() interface{} + createRepositoryFunc func(interface{}) (Repository, error) +} + +// AddSupportedRepository registers factory function to create repository with a given type name. +func AddSupportedRepository( + repositoryType string, + defaultConfigFunc func() interface{}, + createRepositoryFunc func(interface{}) (Repository, error)) { + + factories[repositoryType] = repositoryFactory{ + defaultConfigFunc: defaultConfigFunc, + createRepositoryFunc: createRepositoryFunc, + } +} + +// NewRepository creates new repository based on RepositoryConfiguration. +// The repository type must be previously registered using AddSupportedRepository. +func NewRepository(cfg RepositoryConfiguration) (Repository, error) { + if factory, ok := factories[cfg.Type]; ok { + return factory.createRepositoryFunc(cfg.Config) + } + + return nil, fmt.Errorf("unknown repository type: %s", cfg.Type) +} diff --git a/storage/repository.go b/storage/repository.go new file mode 100644 index 000000000..cdb257999 --- /dev/null +++ b/storage/repository.go @@ -0,0 +1,45 @@ +package storage + +import ( + "io" + "time" +) + +// BlockID represents the identifier of a block stored in a Repository. +type BlockID string + +// PutOptions controls the behavior of Repository.PutBlock() +type PutOptions struct { + Overwrite bool + IgnoreLimits bool +} + +// Repository encapsulates storage for blocks of data. +type RepositoryWriter interface { + // BlockExists determines whether the specified block existts. + PutBlock(id BlockID, data io.ReadCloser, options PutOptions) error + DeleteBlock(id BlockID) error + Flush() error +} + +// Repository encapsulates storage for blocks of data. +type RepositoryReader interface { + BlockExists(id BlockID) (bool, error) + GetBlock(id BlockID) ([]byte, error) + ListBlocks(prefix BlockID) chan (BlockMetadata) +} + +type Repository interface { + RepositoryReader + RepositoryWriter + Configuration() RepositoryConfiguration +} + +// BlockMetadata represents metadata about a single block in a repository. +// If Error field is set, no other field values should be assumed to be correct. +type BlockMetadata struct { + BlockID BlockID + Length uint64 + TimeStamp time.Time + Error error +} diff --git a/storage/repository_test.go b/storage/repository_test.go new file mode 100644 index 000000000..fb8589f7e --- /dev/null +++ b/storage/repository_test.go @@ -0,0 +1,118 @@ +package storage + +import ( + "bytes" + "io/ioutil" + "os" + "testing" +) + +func TestLoggingRepository(t *testing.T) { + data := map[string][]byte{} + r := NewLoggingWrapper(NewMapRepository(data)) + if r == nil { + t.Errorf("unexpected result: %v", r) + } + verifyRepository(t, r) +} + +func TestMapRepository(t *testing.T) { + data := map[string][]byte{} + r := NewMapRepository(data) + if r == nil { + t.Errorf("unexpected result: %v", r) + } + verifyRepository(t, r) +} + +func TestFileRepository(t *testing.T) { + // Test varioush shard configurations. + for _, shardSpec := range [][]int{ + []int{0}, + []int{1}, + []int{3, 3}, + []int{2}, + []int{1, 1}, + []int{1, 2}, + []int{2, 2, 2}, + } { + path, _ := ioutil.TempDir("", "r-fs") + defer os.RemoveAll(path) + + r, err := NewFSRepository(&FSRepositoryOptions{ + Path: path, + DirectoryShards: shardSpec, + }) + if r == nil || err != nil { + t.Errorf("unexpected result: %v %v", r, err) + } + verifyRepository(t, r) + } +} + +func verifyRepository(t *testing.T, r Repository) { + blocks := []struct { + blk BlockID + contents []byte + }{ + {blk: BlockID("abcdbbf4f0507d054ed5a80a5b65086f602b"), contents: []byte{}}, + {blk: BlockID("zxce0e35630770c54668a8cfb4e414c6bf8f"), contents: []byte{1}}, + {blk: BlockID("abff4585856ebf0748fd989e1dd623a8963d"), contents: bytes.Repeat([]byte{1}, 1000)}, + {blk: BlockID("abgc3dca496d510f492c858a2df1eb824e62"), contents: bytes.Repeat([]byte{1}, 10000)}, + } + + // First verify that blocks don't exist. + for _, b := range blocks { + if x, err := r.BlockExists(b.blk); x || err != nil { + t.Errorf("block exists or error: %v %v", b.blk, err) + } + + data, err := r.GetBlock(b.blk) + if err != ErrBlockNotFound { + t.Errorf("unexpected error when calling GetBlock(%v): %v", b.blk, err) + } + if data != nil { + t.Errorf("got data when calling GetBlock(%v): %v", b.blk, data) + } + } + + // Now add blocks. + for _, b := range blocks { + r.PutBlock(b.blk, ioutil.NopCloser(bytes.NewBuffer(b.contents)), PutOptions{}) + + if x, err := r.BlockExists(b.blk); !x || err != nil { + t.Errorf("block does not exist after adding it: %v %v", b.blk, err) + } + + data, err := r.GetBlock(b.blk) + if err != nil { + t.Errorf("unexpected error when calling GetBlock(%v) after adding: %v", b.blk, err) + } + if !bytes.Equal(data, b.contents) { + t.Errorf("got data when calling GetBlock(%v): %v", b.blk, data) + } + } + + // List + ch := r.ListBlocks(BlockID("ab")) + e1, ok := <-ch + if !ok || e1.BlockID != blocks[0].blk { + t.Errorf("missing result 0") + } + e2, ok := <-ch + if !ok || e2.BlockID != blocks[2].blk { + t.Errorf("missing result 2") + } + e3, ok := <-ch + if !ok || e3.BlockID != blocks[3].blk { + t.Errorf("missing result 3") + } + e4, ok := <-ch + if ok { + t.Errorf("unexpected item: %v", e4) + } + + if e1.TimeStamp.After(e2.TimeStamp) || e2.TimeStamp.After(e3.TimeStamp) { + t.Errorf("timings are not sorted: %v %v %v", e1.TimeStamp, e2.TimeStamp, e3.TimeStamp) + } +} diff --git a/storage/writeback.go b/storage/writeback.go new file mode 100644 index 000000000..c89a89f14 --- /dev/null +++ b/storage/writeback.go @@ -0,0 +1,128 @@ +// Wrapper which implements asynchronous (write-back) PutBlock and DeleteBlock operation +// useful for slower backends (cloud). + +package storage + +import ( + "fmt" + "io" + "sync" + "sync/atomic" +) + +type writeBackRepository struct { + Repository + + channel chan writeBackRequest + deferredError atomic.Value + workerCount int +} + +type writeBackRequest struct { + action func() error + workerPaused *sync.WaitGroup + workerRelease *sync.WaitGroup + debugInfo string +} + +func (wb *writeBackRepository) PutBlock(blockID BlockID, data io.ReadCloser, options PutOptions) error { + err := wb.getDeferredError() + if err != nil { + data.Close() + return err + } + + wb.channel <- writeBackRequest{ + action: func() error { + return wb.Repository.PutBlock(blockID, data, options) + }, + debugInfo: fmt.Sprintf("Put(%s)", blockID), + } + return nil +} + +func (wb *writeBackRepository) getDeferredError() error { + deferredError := wb.deferredError.Load() + if deferredError != nil { + return deferredError.(error) + } + + return nil +} + +func (wb *writeBackRepository) DeleteBlock(blockID BlockID) error { + wb.channel <- writeBackRequest{ + action: func() error { + return wb.Repository.DeleteBlock(blockID) + }, + debugInfo: fmt.Sprintf("Delete(%s)", blockID), + } + return nil +} + +func (wb *writeBackRepository) Flush() error { + rwg := sync.WaitGroup{} + rwg.Add(1) + + // Create a wait group that all workers will join. + wg := sync.WaitGroup{} + wg.Add(wb.workerCount) + + // Send a request to all workers that causes them to report to the waitgroup. + for n := 0; n < wb.workerCount; n++ { + wb.channel <- writeBackRequest{ + workerPaused: &wg, + workerRelease: &rwg, + } + } + + // Wait until all workers join the wait group. + wg.Wait() + + // Now release them all. + rwg.Done() + + return wb.Repository.Flush() +} + +func (wb *writeBackRepository) processRequest(req writeBackRequest) { + if req.workerPaused != nil { + req.workerPaused.Done() + req.workerRelease.Wait() + return + } + if wb.getDeferredError() != nil { + return + } + + err := req.action() + if err != nil { + wb.deferredError.Store(err) + } +} + +// NewWriteBackWrapper returns a Repository wrapper that processes writes asynchronously using the specified +// number of worker goroutines. This wrapper is best used with Repositories that exhibit high latency. +func NewWriteBackWrapper(wrapped Repository, workerCount int) Repository { + ch := make(chan writeBackRequest, workerCount) + result := &writeBackRepository{ + Repository: wrapped, + channel: ch, + workerCount: workerCount, + } + + for i := 0; i < workerCount; i++ { + go func(workerId int) { + for { + req, ok := <-ch + if !ok { + break + } + + result.processRequest(req) + } + }(i) + } + + return result +}