beginnings of uploader

This commit is contained in:
Jarek Kowalski
2016-03-27 07:44:22 -07:00
parent 28b0d3d47f
commit 3cf64c9fcf
5 changed files with 286 additions and 19 deletions

View File

@@ -55,21 +55,20 @@ func FileModeToType(mode os.FileMode) EntryType {
}
}
// Opener opens the contents of directory entry for reading.
type Opener func() (io.ReadCloser, error)
// Entry stores attributes of a single entry in a directory.
type Entry struct {
Name string
Size int64
Type EntryType
ModTime time.Time
Mode int16 // 0000 .. 0777
UserID uint32
GroupID uint32
ObjectID content.ObjectID
Name string
Size int64
Type EntryType
ModTime time.Time
Mode int16 // 0000 .. 0777
UserID uint32
GroupID uint32
ObjectID content.ObjectID
MetadataCRC32 uint32
Open Opener
List func() (Listing, error)
Open func() (io.ReadCloser, error)
}
func (e *Entry) String() string {

View File

@@ -6,6 +6,7 @@
"io/ioutil"
"os"
"path/filepath"
"sort"
)
// Listing encapsulates list of items in a directory.
@@ -14,6 +15,27 @@ type Listing struct {
Files []*Entry
}
func findEntryByName(entries []*Entry, name string) *Entry {
i := sort.Search(
len(entries),
func(i int) bool { return entries[i].Name >= name },
)
if i < len(entries) && entries[i].Name == name {
return entries[i]
}
return nil
}
func (l Listing) FindDirectoryByName(name string) *Entry {
return findEntryByName(l.Directories, name)
}
func (l Listing) FindFileByName(name string) *Entry {
return findEntryByName(l.Files, name)
}
func (l Listing) String() string {
s := ""
for i, d := range l.Directories {
@@ -68,19 +90,15 @@ func NewFilesystemLister() Lister {
return &filesystemLister{}
}
func openFileFunc(parentDir string, fi os.FileInfo) Opener {
return func() (io.ReadCloser, error) {
return os.Open(filepath.Join(parentDir, fi.Name()))
}
}
func entryFromFileSystemInfo(parentDir string, fi os.FileInfo) (*Entry, error) {
e := &Entry{
Name: fi.Name(),
Mode: int16(fi.Mode().Perm()),
ModTime: fi.ModTime(),
Type: FileModeToType(fi.Mode()),
Open: openFileFunc(parentDir, fi),
Open: func() (io.ReadCloser, error) {
return os.Open(filepath.Join(parentDir, fi.Name()))
},
}
if e.Type == EntryTypeFile {

7
dir/reader.go Normal file
View File

@@ -0,0 +1,7 @@
package dir
import "io"
func ReadListing(r io.Reader) (Listing, error) {
return Listing{}, nil
}

162
dir/upload.go Normal file
View File

@@ -0,0 +1,162 @@
package dir
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"os"
"path/filepath"
"time"
"github.com/kopia/kopia/cas"
"github.com/kopia/kopia/content"
)
type uploader struct {
mgr cas.ObjectManager
lister Lister
}
func (u uploader) UploadDir(path string, previous content.ObjectID) (content.ObjectID, error) {
oid, _, err := u.uploadDirInternal(path, previous, 0)
return oid, err
}
func (u uploader) UploadFile(path string) (content.ObjectID, error) {
oid, _, err := u.uploadFileInternal(path)
return oid, err
}
func (u uploader) uploadFileInternal(path string) (content.ObjectID, uint32, error) {
file, err := os.Open(path)
if err != nil {
return content.NullObjectID, 0, fmt.Errorf("unable to open file %s: %v", path, err)
}
defer file.Close()
writer := u.mgr.NewWriter(
cas.WithDescription("FILE:"+path),
cas.WithBlockNamePrefix("F"),
)
defer writer.Close()
io.Copy(writer, file)
result, err := writer.Result(false)
if err != nil {
return content.NullObjectID, 0, err
}
s, err := file.Stat()
return result, computeChecksum(filepath.Base(file.Name()), s.Size(), s.ModTime()), nil
}
func (u uploader) uploadDirInternal(path string, previous content.ObjectID, previousCRC32 uint32) (content.ObjectID, uint32, error) {
listing, err := u.lister.List(path)
if err != nil {
return content.NullObjectID, 0, err
}
var previousListing Listing
if previous != "" {
d, err := u.mgr.Open(previous)
if err == nil {
previousListing, _ = ReadListing(d)
}
}
h := crc32.NewIEEE()
// Process all directories first, in canonical order before any files.
for _, d := range listing.Directories {
p := previousListing.FindDirectoryByName(d.Name)
var prevObjectID content.ObjectID
var prevCRC uint32
if p != nil {
prevCRC = p.MetadataCRC32
prevObjectID = p.ObjectID
}
objectID, metadataChecksum, err := u.uploadDirInternal(
filepath.Join(path, d.Name),
prevObjectID,
prevCRC,
)
if err != nil {
return content.NullObjectID, 0, err
}
d.ObjectID = objectID
h.Write([]byte{0})
binary.Write(h, binary.LittleEndian, metadataChecksum)
}
// Process files, for each file read cached chunk ID from streaming cache.
for _, f := range listing.Files {
h.Write([]byte{0})
binary.Write(h, binary.LittleEndian, f.MetadataCRC32)
}
dirMetadataChecksum := h.Sum32()
if dirMetadataChecksum == previousCRC32 {
return previous, dirMetadataChecksum, nil
}
// Upload all files.
for _, f := range listing.Files {
fullPath := filepath.Join(path, f.Name)
if f.ObjectID == content.NullObjectID {
f.ObjectID, err = u.UploadFile(fullPath)
if err != nil {
return content.NullObjectID, 0, fmt.Errorf("unable to hash file: %s", err)
}
}
}
writer := u.mgr.NewWriter(
cas.WithDescription("DIR:"+path),
cas.WithBlockNamePrefix("D"),
)
defer writer.Close()
dw := NewWriter(writer)
for _, d := range listing.Directories {
if err := dw.WriteEntry(d); err != nil {
return "", 0, err
}
}
for _, f := range listing.Files {
if err := dw.WriteEntry(f); err != nil {
return "", 0, err
}
}
if directoryObjectID, err := writer.Result(true); err == nil {
return directoryObjectID, dirMetadataChecksum, nil
}
return "", 0, err
}
func computeChecksum(fileName string, size int64, modTime time.Time) uint32 {
hash := crc32.NewIEEE()
binary.Write(hash, binary.LittleEndian, size)
binary.Write(hash, binary.LittleEndian, modTime.UnixNano())
binary.Write(hash, binary.LittleEndian, []byte(fileName))
return hash.Sum32()
}
func entryObjectIDOrEmpty(e *Entry) content.ObjectID {
if e == nil {
return content.NullObjectID
}
return e.ObjectID
}

81
dir/upload_test.go Normal file
View File

@@ -0,0 +1,81 @@
package dir
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"github.com/kopia/kopia/cas"
"github.com/kopia/kopia/storage"
"testing"
)
func TestUpload(t *testing.T) {
var err error
sourceDir, err := ioutil.TempDir("", "kopia-src")
if err != nil {
t.Errorf("cannot create temp directory: %v", err)
return
}
os.MkdirAll(filepath.Join(sourceDir, "d1/d1"), 0777)
os.MkdirAll(filepath.Join(sourceDir, "d1/d2"), 0777)
os.MkdirAll(filepath.Join(sourceDir, "d2/d1"), 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "f1"), []byte{1, 2, 3}, 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "f2"), []byte{1, 2, 3, 4}, 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "f3"), []byte{1, 2, 3, 4, 5}, 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "d1/d1/f1"), []byte{1, 2, 3}, 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "d1/d1/f2"), []byte{1, 2, 3, 4}, 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "d1/f2"), []byte{1, 2, 3, 4}, 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "d1/d2/f1"), []byte{1, 2, 3}, 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "d1/d2/f2"), []byte{1, 2, 3, 4}, 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "d2/d1/f1"), []byte{1, 2, 3}, 0777)
ioutil.WriteFile(filepath.Join(sourceDir, "d2/d1/f2"), []byte{1, 2, 3, 4}, 0777)
defer os.RemoveAll(sourceDir)
repoDir, err := ioutil.TempDir("", "kopia-repo")
if err != nil {
t.Errorf("cannot create temp directory: %v", err)
return
}
defer os.RemoveAll(repoDir)
format := cas.Format{
Version: "1",
Hash: "md5",
}
repo, err := storage.NewFSRepository(&storage.FSRepositoryOptions{
Path: repoDir,
})
if err != nil {
t.Errorf("unable to create repo: %v", err)
return
}
objectManager, err := cas.NewObjectManager(repo, format)
if err != nil {
t.Errorf("unable to create object manager: %v", err)
return
}
u := &uploader{
mgr: objectManager,
lister: NewFilesystemLister(),
}
oid, err := u.UploadDir(sourceDir, "")
if err != nil {
t.Errorf("upload failed: %v", err)
}
log.Printf("oid: %v", oid)
}