mirror of
https://github.com/kopia/kopia.git
synced 2026-03-14 20:26:51 -04:00
upload stats
This commit is contained in:
@@ -39,7 +39,7 @@ func (hls *highLatencyStorage) GetBlock(id blob.BlockID) ([]byte, error) {
|
||||
return hls.Storage.GetBlock(id)
|
||||
}
|
||||
|
||||
func uploadAndTime(omgr cas.ObjectManager, dir string, previous cas.ObjectID) (cas.ObjectID, cas.ObjectID) {
|
||||
func uploadAndTime(omgr cas.ObjectManager, dir string, previous cas.ObjectID) *fs.UploadResult {
|
||||
log.Println("---")
|
||||
uploader, err := fs.NewUploader(omgr)
|
||||
if err != nil {
|
||||
@@ -48,15 +48,15 @@ func uploadAndTime(omgr cas.ObjectManager, dir string, previous cas.ObjectID) (c
|
||||
|
||||
omgr.ResetStats()
|
||||
t0 := time.Now()
|
||||
oid, manifestOID, err := uploader.UploadDir(dir, previous)
|
||||
res, err := uploader.UploadDir(dir, previous)
|
||||
if err != nil {
|
||||
log.Fatalf("Error uploading: %v", err)
|
||||
}
|
||||
dt := time.Since(t0)
|
||||
|
||||
log.Printf("Uploaded: %v in %v", oid, dt)
|
||||
log.Printf("Uploaded: %v in %v", res.ObjectID, dt)
|
||||
log.Printf("Stats: %#v", omgr.Stats())
|
||||
return oid, manifestOID
|
||||
return res
|
||||
}
|
||||
|
||||
type subdirEntry struct {
|
||||
@@ -228,28 +228,8 @@ func main() {
|
||||
|
||||
path := "/Users/jarek/Projects/Kopia/src/github.com/kopia/"
|
||||
|
||||
_, manifestOID := uploadAndTime(omgr, path, "")
|
||||
log.Printf("second time")
|
||||
uploadAndTime(omgr, path, manifestOID)
|
||||
log.Printf("finished second time")
|
||||
//readCached(omgr, manifestOID)
|
||||
// time.Sleep(1 * time.Second)
|
||||
// for i := 0; i < 1; i++ {
|
||||
// t0 := time.Now()
|
||||
// c := 0
|
||||
// d := 0
|
||||
// allGantt = nil
|
||||
// for e := range walkTree(omgr, "BASE/", oid) {
|
||||
// //log.Printf("e: %v %v", e.Name(), e.ObjectID())
|
||||
// if e.IsDir() {
|
||||
// d++
|
||||
// }
|
||||
// c++
|
||||
// }
|
||||
// dt := time.Since(t0)
|
||||
// log.Printf("walk took %v and returned %v (%v dirs)", dt, c, d)
|
||||
// // for _, e := range allGantt {
|
||||
// // fmt.Printf("%v,%v,%v\n", e.dir, e.from.Sub(t0).Nanoseconds()/1000, e.to.Sub(e.from).Nanoseconds()/1000)
|
||||
// // }
|
||||
// }
|
||||
r1 := uploadAndTime(omgr, path, "")
|
||||
log.Printf("finished: %#v", *r1)
|
||||
r2 := uploadAndTime(omgr, path, r1.ManifestID)
|
||||
log.Printf("finished second time: %#v", *r2)
|
||||
}
|
||||
|
||||
53
fs/upload.go
53
fs/upload.go
@@ -18,10 +18,22 @@
|
||||
// ErrUploadCancelled is returned when the upload gets cancelled.
|
||||
var ErrUploadCancelled = errors.New("upload cancelled")
|
||||
|
||||
type UploadResult struct {
|
||||
ObjectID cas.ObjectID
|
||||
ManifestID cas.ObjectID
|
||||
|
||||
Stats struct {
|
||||
CachedDirectories int
|
||||
CachedFiles int
|
||||
NonCachedDirectories int
|
||||
NonCachedFiles int
|
||||
}
|
||||
}
|
||||
|
||||
// Uploader supports efficient uploading files and directories to CAS.
|
||||
type Uploader interface {
|
||||
UploadFile(path string) (cas.ObjectID, error)
|
||||
UploadDir(path string, previousObjectID cas.ObjectID) (objectID cas.ObjectID, manifestObjectID cas.ObjectID, err error)
|
||||
UploadDir(path string, previousManifestID cas.ObjectID) (*UploadResult, error)
|
||||
Cancel()
|
||||
}
|
||||
|
||||
@@ -58,13 +70,14 @@ func (u *uploader) UploadFile(path string) (cas.ObjectID, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (u *uploader) UploadDir(path string, previous cas.ObjectID) (cas.ObjectID, cas.ObjectID, error) {
|
||||
func (u *uploader) UploadDir(path string, previousManifestID cas.ObjectID) (*UploadResult, error) {
|
||||
//log.Printf("UploadDir", path)
|
||||
//defer log.Printf("finishing UploadDir", path)
|
||||
var hcr hashcacheReader
|
||||
var err error
|
||||
|
||||
if previous != "" {
|
||||
if r, err := u.mgr.Open(previous); err == nil {
|
||||
if previousManifestID != "" {
|
||||
if r, err := u.mgr.Open(previousManifestID); err == nil {
|
||||
if dr, err := newDirectoryReader(r); err == nil {
|
||||
hcr.Open(dr)
|
||||
}
|
||||
@@ -76,22 +89,30 @@ func (u *uploader) UploadDir(path string, previous cas.ObjectID) (cas.ObjectID,
|
||||
cas.WithBlockNamePrefix("H"),
|
||||
)
|
||||
dw := newDirectoryWriter(manifestWriter)
|
||||
oid, _, err := u.uploadDirInternal(path, ".", dw, &hcr)
|
||||
|
||||
result := &UploadResult{}
|
||||
result.ObjectID, _, err = u.uploadDirInternal(result, path, ".", dw, &hcr)
|
||||
if err != nil {
|
||||
dw.Close()
|
||||
return oid, cas.NullObjectID, err
|
||||
return result, err
|
||||
}
|
||||
|
||||
err = dw.Close()
|
||||
if err != nil {
|
||||
return oid, cas.NullObjectID, err
|
||||
return result, err
|
||||
}
|
||||
|
||||
manifestOid, err := manifestWriter.Result(true)
|
||||
return oid, manifestOid, nil
|
||||
result.ManifestID, err = manifestWriter.Result(true)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (u *uploader) uploadDirInternal(path string, relativePath string, hcw *directoryWriter, hcr *hashcacheReader) (cas.ObjectID, bool, error) {
|
||||
func (u *uploader) uploadDirInternal(
|
||||
result *UploadResult,
|
||||
path string,
|
||||
relativePath string,
|
||||
hcw *directoryWriter,
|
||||
hcr *hashcacheReader,
|
||||
) (cas.ObjectID, bool, error) {
|
||||
dir, err := u.lister.List(path)
|
||||
if err != nil {
|
||||
return cas.NullObjectID, false, err
|
||||
@@ -112,7 +133,7 @@ func (u *uploader) uploadDirInternal(path string, relativePath string, hcw *dire
|
||||
entryRelativePath := relativePath + "/" + e.Name
|
||||
|
||||
if e.IsDir() {
|
||||
oid, wasCached, err := u.uploadDirInternal(fullPath, entryRelativePath, hcw, hcr)
|
||||
oid, wasCached, err := u.uploadDirInternal(result, fullPath, entryRelativePath, hcw, hcr)
|
||||
if err != nil {
|
||||
return cas.NullObjectID, false, err
|
||||
}
|
||||
@@ -129,9 +150,11 @@ func (u *uploader) uploadDirInternal(path string, relativePath string, hcw *dire
|
||||
allCached = allCached && cacheMatches && numSkipped == 0
|
||||
|
||||
if cacheMatches {
|
||||
result.Stats.CachedFiles++
|
||||
// Avoid hashing by reusing previous object ID.
|
||||
e.ObjectID = cachedEntry.ObjectID
|
||||
} else {
|
||||
result.Stats.NonCachedFiles++
|
||||
e.ObjectID, err = u.UploadFile(fullPath)
|
||||
if err != nil {
|
||||
return cas.NullObjectID, false, fmt.Errorf("unable to hash file: %s", err)
|
||||
@@ -155,13 +178,15 @@ func (u *uploader) uploadDirInternal(path string, relativePath string, hcw *dire
|
||||
|
||||
var directoryOID cas.ObjectID
|
||||
|
||||
dirEntry, numSkipped := hcr.GetEntry(relativePath + "/")
|
||||
allCached = allCached && dirEntry != nil && numSkipped == 0
|
||||
cachedDirEntry, numSkipped := hcr.GetEntry(relativePath + "/")
|
||||
allCached = allCached && cachedDirEntry != nil && numSkipped == 0
|
||||
|
||||
if allCached {
|
||||
// Avoid hashing directory listing if every entry matched the cache.
|
||||
return dirEntry.ObjectID, true, nil
|
||||
result.Stats.CachedDirectories++
|
||||
return cachedDirEntry.ObjectID, true, nil
|
||||
} else {
|
||||
result.Stats.NonCachedDirectories++
|
||||
directoryOID, err = writer.Result(true)
|
||||
return directoryOID, false, err
|
||||
}
|
||||
|
||||
@@ -73,51 +73,51 @@ func TestUpload(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
oid, metadataOID, err := u.UploadDir(sourceDir, "")
|
||||
r1, err := u.UploadDir(sourceDir, "")
|
||||
if err != nil {
|
||||
t.Errorf("upload failed: %v", err)
|
||||
}
|
||||
|
||||
oid2, metadataOID2, err := u.UploadDir(sourceDir, oid)
|
||||
r2, err := u.UploadDir(sourceDir, r1.ObjectID)
|
||||
if err != nil {
|
||||
t.Errorf("upload failed: %v", err)
|
||||
}
|
||||
|
||||
if oid2 != oid {
|
||||
t.Errorf("expected oid==oid2, got %v and %v", oid, oid2)
|
||||
if r2.ObjectID != r1.ObjectID {
|
||||
t.Errorf("expected r1.ObjectID==r2.ObjectID, got %v and %v", r1.ObjectID, r2.ObjectID)
|
||||
}
|
||||
|
||||
if metadataOID2 != metadataOID {
|
||||
t.Errorf("expected metadataOID2==metadataOID, got %v and %v", metadataOID2, metadataOID)
|
||||
if r2.ManifestID != r1.ManifestID {
|
||||
t.Errorf("expected r2.ManifestID==r1.ManifestID, got %v and %v", r2.ManifestID, r1.ManifestID)
|
||||
}
|
||||
|
||||
// Add one more file, the oid should change.
|
||||
// Add one more file, the r1.ObjectID should change.
|
||||
ioutil.WriteFile(filepath.Join(sourceDir, "d2/d1/f3"), []byte{1, 2, 3, 4, 5}, 0777)
|
||||
oid3, metadataOID3, err := u.UploadDir(sourceDir, oid)
|
||||
r3, err := u.UploadDir(sourceDir, r1.ObjectID)
|
||||
if err != nil {
|
||||
t.Errorf("upload failed: %v", err)
|
||||
}
|
||||
|
||||
if oid2 == oid3 {
|
||||
t.Errorf("expected oid3!=oid2, got %v", oid3)
|
||||
if r2.ObjectID == r3.ObjectID {
|
||||
t.Errorf("expected r3.ObjectID!=r2.ObjectID, got %v", r3.ObjectID)
|
||||
}
|
||||
|
||||
if metadataOID2 == metadataOID3 {
|
||||
t.Errorf("expected metadataOID3!=metadataOID2, got %v", metadataOID3)
|
||||
if r2.ManifestID == r3.ManifestID {
|
||||
t.Errorf("expected r3.ManifestID!=r2.ManifestID, got %v", r3.ManifestID)
|
||||
}
|
||||
|
||||
// Now remove the added file, OID should be identical to the original before the file got added.
|
||||
os.Remove(filepath.Join(sourceDir, "d2/d1/f3"))
|
||||
|
||||
oid4, metadataOID4, err := u.UploadDir(sourceDir, "")
|
||||
r4, err := u.UploadDir(sourceDir, "")
|
||||
if err != nil {
|
||||
t.Errorf("upload failed: %v", err)
|
||||
}
|
||||
|
||||
if oid4 != oid {
|
||||
t.Errorf("expected oid3==oid, got %v and %v", oid4, oid)
|
||||
if r4.ObjectID != r1.ObjectID {
|
||||
t.Errorf("expected r3.ObjectID==r1.ObjectID, got %v and %v", r4.ObjectID, r1.ObjectID)
|
||||
}
|
||||
if metadataOID4 != metadataOID {
|
||||
t.Errorf("expected metadataOID3==metadataOID4, got %v and %v", metadataOID4, metadataOID)
|
||||
if r4.ManifestID != r1.ManifestID {
|
||||
t.Errorf("expected r3.ManifestID==r4.ManifestID, got %v and %v", r4.ManifestID, r1.ManifestID)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user