From 3cf64c9fcfcd114a1d72444e04fa264bb59eb344 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sun, 27 Mar 2016 07:44:22 -0700 Subject: [PATCH] beginnings of uploader --- dir/entry.go | 23 +++---- dir/lister.go | 32 +++++++-- dir/reader.go | 7 ++ dir/upload.go | 162 +++++++++++++++++++++++++++++++++++++++++++++ dir/upload_test.go | 81 +++++++++++++++++++++++ 5 files changed, 286 insertions(+), 19 deletions(-) create mode 100644 dir/reader.go create mode 100644 dir/upload.go create mode 100644 dir/upload_test.go diff --git a/dir/entry.go b/dir/entry.go index 4b7a54e76..cd8315b4c 100644 --- a/dir/entry.go +++ b/dir/entry.go @@ -55,21 +55,20 @@ func FileModeToType(mode os.FileMode) EntryType { } } -// Opener opens the contents of directory entry for reading. -type Opener func() (io.ReadCloser, error) - // Entry stores attributes of a single entry in a directory. type Entry struct { - Name string - Size int64 - Type EntryType - ModTime time.Time - Mode int16 // 0000 .. 0777 - UserID uint32 - GroupID uint32 - ObjectID content.ObjectID + Name string + Size int64 + Type EntryType + ModTime time.Time + Mode int16 // 0000 .. 0777 + UserID uint32 + GroupID uint32 + ObjectID content.ObjectID + MetadataCRC32 uint32 - Open Opener + List func() (Listing, error) + Open func() (io.ReadCloser, error) } func (e *Entry) String() string { diff --git a/dir/lister.go b/dir/lister.go index 76bc0fec8..b25c8057f 100644 --- a/dir/lister.go +++ b/dir/lister.go @@ -6,6 +6,7 @@ "io/ioutil" "os" "path/filepath" + "sort" ) // Listing encapsulates list of items in a directory. @@ -14,6 +15,27 @@ type Listing struct { Files []*Entry } +func findEntryByName(entries []*Entry, name string) *Entry { + i := sort.Search( + len(entries), + func(i int) bool { return entries[i].Name >= name }, + ) + + if i < len(entries) && entries[i].Name == name { + return entries[i] + } + + return nil +} + +func (l Listing) FindDirectoryByName(name string) *Entry { + return findEntryByName(l.Directories, name) +} + +func (l Listing) FindFileByName(name string) *Entry { + return findEntryByName(l.Files, name) +} + func (l Listing) String() string { s := "" for i, d := range l.Directories { @@ -68,19 +90,15 @@ func NewFilesystemLister() Lister { return &filesystemLister{} } -func openFileFunc(parentDir string, fi os.FileInfo) Opener { - return func() (io.ReadCloser, error) { - return os.Open(filepath.Join(parentDir, fi.Name())) - } -} - func entryFromFileSystemInfo(parentDir string, fi os.FileInfo) (*Entry, error) { e := &Entry{ Name: fi.Name(), Mode: int16(fi.Mode().Perm()), ModTime: fi.ModTime(), Type: FileModeToType(fi.Mode()), - Open: openFileFunc(parentDir, fi), + Open: func() (io.ReadCloser, error) { + return os.Open(filepath.Join(parentDir, fi.Name())) + }, } if e.Type == EntryTypeFile { diff --git a/dir/reader.go b/dir/reader.go new file mode 100644 index 000000000..a40bf0744 --- /dev/null +++ b/dir/reader.go @@ -0,0 +1,7 @@ +package dir + +import "io" + +func ReadListing(r io.Reader) (Listing, error) { + return Listing{}, nil +} diff --git a/dir/upload.go b/dir/upload.go new file mode 100644 index 000000000..e052a1f0e --- /dev/null +++ b/dir/upload.go @@ -0,0 +1,162 @@ +package dir + +import ( + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "os" + "path/filepath" + "time" + + "github.com/kopia/kopia/cas" + "github.com/kopia/kopia/content" +) + +type uploader struct { + mgr cas.ObjectManager + lister Lister +} + +func (u uploader) UploadDir(path string, previous content.ObjectID) (content.ObjectID, error) { + oid, _, err := u.uploadDirInternal(path, previous, 0) + + return oid, err +} + +func (u uploader) UploadFile(path string) (content.ObjectID, error) { + oid, _, err := u.uploadFileInternal(path) + return oid, err +} + +func (u uploader) uploadFileInternal(path string) (content.ObjectID, uint32, error) { + file, err := os.Open(path) + if err != nil { + return content.NullObjectID, 0, fmt.Errorf("unable to open file %s: %v", path, err) + } + defer file.Close() + + writer := u.mgr.NewWriter( + cas.WithDescription("FILE:"+path), + cas.WithBlockNamePrefix("F"), + ) + defer writer.Close() + + io.Copy(writer, file) + result, err := writer.Result(false) + if err != nil { + return content.NullObjectID, 0, err + } + + s, err := file.Stat() + + return result, computeChecksum(filepath.Base(file.Name()), s.Size(), s.ModTime()), nil +} + +func (u uploader) uploadDirInternal(path string, previous content.ObjectID, previousCRC32 uint32) (content.ObjectID, uint32, error) { + listing, err := u.lister.List(path) + if err != nil { + return content.NullObjectID, 0, err + } + + var previousListing Listing + + if previous != "" { + d, err := u.mgr.Open(previous) + if err == nil { + previousListing, _ = ReadListing(d) + } + } + + h := crc32.NewIEEE() + + // Process all directories first, in canonical order before any files. + for _, d := range listing.Directories { + p := previousListing.FindDirectoryByName(d.Name) + var prevObjectID content.ObjectID + var prevCRC uint32 + + if p != nil { + prevCRC = p.MetadataCRC32 + prevObjectID = p.ObjectID + } + + objectID, metadataChecksum, err := u.uploadDirInternal( + filepath.Join(path, d.Name), + prevObjectID, + prevCRC, + ) + if err != nil { + return content.NullObjectID, 0, err + } + + d.ObjectID = objectID + h.Write([]byte{0}) + binary.Write(h, binary.LittleEndian, metadataChecksum) + } + + // Process files, for each file read cached chunk ID from streaming cache. + for _, f := range listing.Files { + h.Write([]byte{0}) + binary.Write(h, binary.LittleEndian, f.MetadataCRC32) + } + + dirMetadataChecksum := h.Sum32() + + if dirMetadataChecksum == previousCRC32 { + return previous, dirMetadataChecksum, nil + } + + // Upload all files. + for _, f := range listing.Files { + fullPath := filepath.Join(path, f.Name) + if f.ObjectID == content.NullObjectID { + f.ObjectID, err = u.UploadFile(fullPath) + if err != nil { + return content.NullObjectID, 0, fmt.Errorf("unable to hash file: %s", err) + } + } + } + + writer := u.mgr.NewWriter( + cas.WithDescription("DIR:"+path), + cas.WithBlockNamePrefix("D"), + ) + defer writer.Close() + + dw := NewWriter(writer) + + for _, d := range listing.Directories { + if err := dw.WriteEntry(d); err != nil { + return "", 0, err + } + } + for _, f := range listing.Files { + if err := dw.WriteEntry(f); err != nil { + return "", 0, err + } + } + + if directoryObjectID, err := writer.Result(true); err == nil { + return directoryObjectID, dirMetadataChecksum, nil + } + + return "", 0, err + +} + +func computeChecksum(fileName string, size int64, modTime time.Time) uint32 { + hash := crc32.NewIEEE() + binary.Write(hash, binary.LittleEndian, size) + binary.Write(hash, binary.LittleEndian, modTime.UnixNano()) + binary.Write(hash, binary.LittleEndian, []byte(fileName)) + return hash.Sum32() +} + +func entryObjectIDOrEmpty(e *Entry) content.ObjectID { + if e == nil { + return content.NullObjectID + } + + return e.ObjectID +} diff --git a/dir/upload_test.go b/dir/upload_test.go new file mode 100644 index 000000000..1e08db4c0 --- /dev/null +++ b/dir/upload_test.go @@ -0,0 +1,81 @@ +package dir + +import ( + "io/ioutil" + "log" + "os" + "path/filepath" + + "github.com/kopia/kopia/cas" + "github.com/kopia/kopia/storage" + + "testing" +) + +func TestUpload(t *testing.T) { + var err error + + sourceDir, err := ioutil.TempDir("", "kopia-src") + if err != nil { + t.Errorf("cannot create temp directory: %v", err) + return + } + + os.MkdirAll(filepath.Join(sourceDir, "d1/d1"), 0777) + os.MkdirAll(filepath.Join(sourceDir, "d1/d2"), 0777) + os.MkdirAll(filepath.Join(sourceDir, "d2/d1"), 0777) + + ioutil.WriteFile(filepath.Join(sourceDir, "f1"), []byte{1, 2, 3}, 0777) + ioutil.WriteFile(filepath.Join(sourceDir, "f2"), []byte{1, 2, 3, 4}, 0777) + ioutil.WriteFile(filepath.Join(sourceDir, "f3"), []byte{1, 2, 3, 4, 5}, 0777) + + ioutil.WriteFile(filepath.Join(sourceDir, "d1/d1/f1"), []byte{1, 2, 3}, 0777) + ioutil.WriteFile(filepath.Join(sourceDir, "d1/d1/f2"), []byte{1, 2, 3, 4}, 0777) + ioutil.WriteFile(filepath.Join(sourceDir, "d1/f2"), []byte{1, 2, 3, 4}, 0777) + ioutil.WriteFile(filepath.Join(sourceDir, "d1/d2/f1"), []byte{1, 2, 3}, 0777) + ioutil.WriteFile(filepath.Join(sourceDir, "d1/d2/f2"), []byte{1, 2, 3, 4}, 0777) + ioutil.WriteFile(filepath.Join(sourceDir, "d2/d1/f1"), []byte{1, 2, 3}, 0777) + ioutil.WriteFile(filepath.Join(sourceDir, "d2/d1/f2"), []byte{1, 2, 3, 4}, 0777) + + defer os.RemoveAll(sourceDir) + + repoDir, err := ioutil.TempDir("", "kopia-repo") + if err != nil { + t.Errorf("cannot create temp directory: %v", err) + return + } + + defer os.RemoveAll(repoDir) + + format := cas.Format{ + Version: "1", + Hash: "md5", + } + + repo, err := storage.NewFSRepository(&storage.FSRepositoryOptions{ + Path: repoDir, + }) + + if err != nil { + t.Errorf("unable to create repo: %v", err) + return + } + + objectManager, err := cas.NewObjectManager(repo, format) + if err != nil { + t.Errorf("unable to create object manager: %v", err) + return + } + + u := &uploader{ + mgr: objectManager, + lister: NewFilesystemLister(), + } + + oid, err := u.UploadDir(sourceDir, "") + if err != nil { + t.Errorf("upload failed: %v", err) + } + + log.Printf("oid: %v", oid) +}