From fa5eae501ae75c80fdf74f70dceac678a3687d5c Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 19 Mar 2018 11:56:16 -0700 Subject: [PATCH] removed cleanup for now --- cli/command_object_cleanup.go | 295 ---------------------------------- 1 file changed, 295 deletions(-) delete mode 100644 cli/command_object_cleanup.go diff --git a/cli/command_object_cleanup.go b/cli/command_object_cleanup.go deleted file mode 100644 index e458a88ca..000000000 --- a/cli/command_object_cleanup.go +++ /dev/null @@ -1,295 +0,0 @@ -package cli - -import ( - "container/list" - "fmt" - "strings" - "sync" - "time" - - "github.com/rs/zerolog/log" - - "github.com/kopia/kopia/fs" - "github.com/kopia/kopia/internal/units" - "github.com/kopia/kopia/object" - "github.com/kopia/kopia/repo" - "github.com/kopia/kopia/snapshot" - - kingpin "gopkg.in/alecthomas/kingpin.v2" -) - -var ( - cleanupCommand = objectCommands.Command("cleanup", "Remove old repository objects not used by any snapshots.").Alias("gc") - cleanupIgnoreAge = cleanupCommand.Flag("min-age", "Minimum block age to be considered for cleanup.").Default("24h").Duration() - - cleanupDelete = cleanupCommand.Flag("delete", "Whether to actually delete unused blocks.").Default("no").String() -) - -type cleanupWorkItem struct { - oid object.ID - isDirectory bool - debug string -} - -func (c *cleanupWorkItem) String() string { - return fmt.Sprintf("%v - %v", c.debug, c.oid.String()) -} - -type cleanupWorkQueue struct { - items *list.List - cond *sync.Cond - visited map[string]bool - processing int - totalCompleted int - totalPending int -} - -func (wq *cleanupWorkQueue) add(it *cleanupWorkItem) { - var os = it.oid.String() - - wq.cond.L.Lock() - if wq.visited[os] { - // Already processed. - wq.cond.L.Unlock() - return - } - - wq.visited[os] = true - wq.totalPending++ - - wq.items.PushBack(it) - wq.cond.Signal() - wq.cond.L.Unlock() -} - -func (wq *cleanupWorkQueue) get() (*cleanupWorkItem, bool) { - wq.cond.L.Lock() - for wq.items.Len() == 0 && wq.processing > 0 { - wq.cond.Wait() - } - - var v *cleanupWorkItem - - if wq.items.Len() > 0 { - f := wq.items.Front() - v = f.Value.(*cleanupWorkItem) - wq.items.Remove(f) - wq.processing++ - } else { - wq.cond.Signal() - } - wq.cond.L.Unlock() - - if v != nil { - return v, true - } - - return nil, false -} - -func (wq *cleanupWorkQueue) finished() { - wq.cond.L.Lock() - wq.processing-- - wq.totalCompleted++ - wq.cond.Signal() - wq.cond.L.Unlock() -} - -func (wq *cleanupWorkQueue) stats() (totalCompleted int, processing int, totalPending int) { - wq.cond.L.Lock() - totalCompleted = wq.totalCompleted - processing = wq.processing - totalPending = wq.totalPending - wq.cond.L.Unlock() - return -} - -type cleanupContext struct { - sync.Mutex - - repo *repo.Repository - mgr *snapshot.Manager - inuse map[string]bool - visited map[string]bool - queue *cleanupWorkQueue - - inuseCollector chan string -} - -func findAliveBlocks(ctx *cleanupContext, wi *cleanupWorkItem) error { - _, blks, err := ctx.repo.Objects.VerifyObject(wi.oid) - if err != nil { - return err - } - - for _, b := range blks { - ctx.inuseCollector <- b - } - - if wi.isDirectory { - entries, err := ctx.mgr.DirectoryEntry(wi.oid).Readdir() - - if err != nil { - return err - } - - for _, entry := range entries { - entryObjectID := entry.(object.HasObjectID).ObjectID() - _, isSubdir := entry.(fs.Directory) - - ctx.queue.add(&cleanupWorkItem{oid: entryObjectID, isDirectory: isSubdir, debug: wi.debug + "/" + entry.Metadata().Name}) - } - } - - return nil -} - -func runCleanupCommand(context *kingpin.ParseContext) error { - rep := mustOpenRepository(nil) - defer rep.Close() //nolint: errcheck - - mgr := snapshot.NewManager(rep) - - log.Printf("Listing active snapshots...") - snapshotNames := mgr.ListSnapshotManifests(nil) - - q := &cleanupWorkQueue{ - items: list.New(), - cond: sync.NewCond(&sync.Mutex{}), - visited: map[string]bool{}, - } - - ctx := &cleanupContext{ - repo: rep, - mgr: mgr, - inuse: map[string]bool{}, - visited: map[string]bool{}, - queue: q, - inuseCollector: make(chan string, 100), - } - - t0 := time.Now() - - log.Printf("Scanning active objects...") - workerCount := 32 - var wg sync.WaitGroup - wg.Add(workerCount) - - snapshots, err := mgr.LoadSnapshots(snapshotNames) - if err != nil { - return err - } - - for _, manifest := range snapshots { - ctx.queue.add(&cleanupWorkItem{manifest.RootObjectID, true, "root"}) - ctx.queue.add(&cleanupWorkItem{manifest.HashCacheID, false, "root-hashcache"}) - } - - _, _, queued := q.stats() - log.Printf("Found %v root objects.", queued) - - go func() { - for iu := range ctx.inuseCollector { - ctx.inuse[iu] = true - } - }() - - for i := 0; i < workerCount; i++ { - go func(workerID int) { - for wi, ok := ctx.queue.get(); ok; wi, ok = ctx.queue.get() { - if err := findAliveBlocks(ctx, wi); err != nil { - log.Warn().Err(err).Msg("can't find alive blocks") - } - ctx.queue.finished() - } - defer wg.Done() - }(i) - } - - var statsWaitGroup sync.WaitGroup - statsWaitGroup.Add(1) - cancelStats := make(chan bool) - - cutoffTime := time.Now().Add(-*cleanupIgnoreAge) - - go func() { - defer statsWaitGroup.Done() - - for { - select { - case <-cancelStats: - return - case <-time.After(1 * time.Second): - done, _, queued := q.stats() - log.Printf("Processed %v objects out of %v (%v objects/sec).", done, queued, int(float64(done)/time.Since(t0).Seconds())) - } - } - }() - - wg.Wait() - close(cancelStats) - - statsWaitGroup.Wait() - dt := time.Since(t0) - - log.Printf("Found %v in-use objects in %v blocks in %v", len(ctx.queue.visited), len(ctx.inuse), dt) - - if err := rep.Blocks.CompactIndexes(); err != nil { - log.Warn().Err(err).Msg("can't compact indexes") - } - - var totalBlocks int - var totalBytes int64 - - var ignoredBlocks int - var ignoredBytes int64 - - var inuseBlocks int - var inuseBytes int64 - - var unreferencedBlocks int - var unreferencedBytes int64 - - var physicalBlocksToDelete []string - - blocks, cancel := rep.Storage.ListBlocks("") - defer cancel() - for b := range blocks { - totalBlocks++ - totalBytes += b.Length - - if strings.HasPrefix(b.BlockID, "P") { - ignoredBlocks++ - ignoredBytes += b.Length - continue - } - - if !ctx.inuse[b.BlockID] { - if b.TimeStamp.After(cutoffTime) { - log.Printf("Ignored unreferenced block: %v (%v) at %v", b.BlockID, units.BytesStringBase10(b.Length), b.TimeStamp.Local()) - ignoredBlocks++ - ignoredBytes += b.Length - } else { - log.Printf("Unreferenced physical block: %v (%v) at %v", b.BlockID, units.BytesStringBase10(b.Length), b.TimeStamp.Local()) - unreferencedBlocks++ - unreferencedBytes += b.Length - - physicalBlocksToDelete = append(physicalBlocksToDelete, b.BlockID) - } - } else { - inuseBlocks++ - inuseBytes += b.Length - } - } - - log.Printf("Found %v (%v) total blocks.", totalBlocks, units.BytesStringBase10(totalBytes)) - log.Printf("Ignored %v blocks (%v).", ignoredBlocks, units.BytesStringBase10(ignoredBytes)) - log.Printf("In-use objects: %v, %v blocks (%v)", len(ctx.queue.visited), inuseBlocks, units.BytesStringBase10(inuseBytes)) - log.Printf("Unreferenced: %v blocks (%v)", unreferencedBlocks, units.BytesStringBase10(unreferencedBytes)) - - return nil -} - -func init() { - cleanupCommand.Action(runCleanupCommand) -}