Files
kopia/cli/command_repository_sync.go
Jarek Kowalski e03971fc59 Upgraded linter to v1.33.0 (#734)
* linter: upgraded to 1.33, disabled some linters

* lint: fixed 'errorlint' errors

This ensures that all error comparisons use errors.Is() or errors.As().
We will be wrapping more errors going forward so it's important that
error checks are not strict everywhere.

Verified that there are no exceptions for errorlint linter which
guarantees that.

* lint: fixed or suppressed wrapcheck errors

* lint: nolintlint and misc cleanups

Co-authored-by: Julio López <julio+gh@kasten.io>
2020-12-21 22:39:22 -08:00

328 lines
9.0 KiB
Go

package cli
import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/stats"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
)
var (
repositorySyncCommand = repositoryCommands.Command("sync-to", "Synchronizes contents of this repository to another location")
repositorySyncUpdate = repositorySyncCommand.Flag("update", "Whether to update blobs present in destination and source if the source is newer.").Default("true").Bool()
repositorySyncDelete = repositorySyncCommand.Flag("delete", "Whether to delete blobs present in destination but not source.").Bool()
repositorySyncDryRun = repositorySyncCommand.Flag("dry-run", "Do not perform copying.").Short('n').Bool()
repositorySyncParallelism = repositorySyncCommand.Flag("parallel", "Copy parallelism.").Default("1").Int()
repositorySyncDestinationMustExist = repositorySyncCommand.Flag("must-exist", "Fail if destination does not have repository format blob.").Bool()
repositorySyncTimes = repositorySyncCommand.Flag("times", "Synchronize blob times if supported.").Bool()
)
const syncProgressInterval = 300 * time.Millisecond
func runSyncWithStorage(ctx context.Context, src, dst blob.Storage) error {
var ()
log(ctx).Infof("Synchronizing repositories:")
log(ctx).Infof(" Source: %v", src.DisplayName())
log(ctx).Infof(" Destination: %v", dst.DisplayName())
if !*repositorySyncDelete {
log(ctx).Noticef("NOTE: By default no BLOBs are deleted, pass --delete to allow it.")
}
if err := ensureRepositoriesHaveSameFormatBlob(ctx, src, dst); err != nil {
return err
}
log(ctx).Infof("Looking for BLOBs to synchronize...")
var (
inSyncBlobs int
inSyncBytes int64
blobsToCopy []blob.Metadata
totalCopyBytes int64
blobsToDelete []blob.Metadata
totalDeleteBytes int64
srcBlobs int
totalSrcSize int64
)
dstMetadata, err := listDestinationBlobs(ctx, dst)
if err != nil {
return err
}
beginSyncProgress()
if err := src.ListBlobs(ctx, "", func(srcmd blob.Metadata) error {
totalSrcSize += srcmd.Length
dstmd, exists := dstMetadata[srcmd.BlobID]
delete(dstMetadata, srcmd.BlobID)
switch {
case !exists:
blobsToCopy = append(blobsToCopy, srcmd)
totalCopyBytes += srcmd.Length
case srcmd.Timestamp.After(dstmd.Timestamp) && *repositorySyncUpdate:
blobsToCopy = append(blobsToCopy, srcmd)
totalCopyBytes += srcmd.Length
default:
inSyncBlobs++
inSyncBytes += srcmd.Length
}
srcBlobs++
outputSyncProgress(fmt.Sprintf(" Found %v BLOBs (%v) in the source repository, %v (%v) to copy", srcBlobs, units.BytesStringBase10(totalSrcSize), len(blobsToCopy), units.BytesStringBase10(totalCopyBytes)))
return nil
}); err != nil {
return errors.Wrap(err, "error listing blobs")
}
finishSyncProcess()
if *repositorySyncDelete {
for _, dstmd := range dstMetadata {
// found in dst, not in src since we were deleting from dst as we found a match.
blobsToDelete = append(blobsToDelete, dstmd)
totalDeleteBytes += dstmd.Length
}
}
log(ctx).Infof(
" Found %v BLOBs to delete (%v), %v in sync (%v)",
len(blobsToDelete), units.BytesStringBase10(totalDeleteBytes),
inSyncBlobs, units.BytesStringBase10(inSyncBytes),
)
if *repositorySyncDryRun {
return nil
}
log(ctx).Infof("Copying...")
beginSyncProgress()
finalErr := runSyncBlobs(ctx, src, dst, blobsToCopy, blobsToDelete, totalCopyBytes)
finishSyncProcess()
return finalErr
}
func listDestinationBlobs(ctx context.Context, dst blob.Storage) (map[blob.ID]blob.Metadata, error) {
dstTotalBytes := int64(0)
dstMetadata := map[blob.ID]blob.Metadata{}
beginSyncProgress()
if err := dst.ListBlobs(ctx, "", func(bm blob.Metadata) error {
dstMetadata[bm.BlobID] = bm
dstTotalBytes += bm.Length
outputSyncProgress(fmt.Sprintf(" Found %v BLOBs in the destination repository (%v)", len(dstMetadata), units.BytesStringBase10(dstTotalBytes)))
return nil
}); err != nil {
return nil, errors.Wrap(err, "error listing BLOBs in destination repository")
}
finishSyncProcess()
return dstMetadata, nil
}
var (
lastSyncProgress string
syncProgressMutex sync.Mutex
nextSyncOutputTime time.Time
)
func beginSyncProgress() {
lastSyncProgress = ""
nextSyncOutputTime = clock.Now()
}
func outputSyncProgress(s string) {
syncProgressMutex.Lock()
defer syncProgressMutex.Unlock()
if len(s) < len(lastSyncProgress) {
s += strings.Repeat(" ", len(lastSyncProgress)-len(s))
}
if clock.Now().After(nextSyncOutputTime) {
printStderr("\r%v", s)
nextSyncOutputTime = clock.Now().Add(syncProgressInterval)
}
lastSyncProgress = s
}
func finishSyncProcess() {
printStderr("\r%v\n", lastSyncProgress)
}
func runSyncBlobs(ctx context.Context, src, dst blob.Storage, blobsToCopy, blobsToDelete []blob.Metadata, totalBytes int64) error {
eg, ctx := errgroup.WithContext(ctx)
copyCh := sliceToChannel(ctx, blobsToCopy)
deleteCh := sliceToChannel(ctx, blobsToDelete)
var progressMutex sync.Mutex
var totalCopied stats.CountSum
startTime := clock.Now()
for i := 0; i < *repositorySyncParallelism; i++ {
workerID := i
eg.Go(func() error {
for m := range copyCh {
log(ctx).Debugf("[%v] Copying %v (%v bytes)...\n", workerID, m.BlobID, m.Length)
if err := syncCopyBlob(ctx, m, src, dst); err != nil {
return errors.Wrapf(err, "error copying %v", m.BlobID)
}
numBlobs, bytesCopied := totalCopied.Add(m.Length)
progressMutex.Lock()
percentage := float64(0)
eta := "unknown"
speed := "-"
elapsedTime := clock.Since(startTime)
if totalBytes > 0 {
percentage = hundredPercent * float64(bytesCopied) / float64(totalBytes)
if percentage > 0 {
totalTimeSeconds := elapsedTime.Seconds() * (hundredPercent / percentage)
etaTime := startTime.Add(time.Duration(totalTimeSeconds) * time.Second)
eta = fmt.Sprintf("%v (%v)", clock.Until(etaTime).Round(time.Second), formatTimestamp(etaTime))
bps := float64(bytesCopied) * 8 / elapsedTime.Seconds() //nolint:gomnd
speed = units.BitsPerSecondsString(bps)
}
}
outputSyncProgress(
fmt.Sprintf(" Copied %v blobs (%v), Speed: %v, ETA: %v",
numBlobs, units.BytesStringBase10(bytesCopied), speed, eta))
progressMutex.Unlock()
}
for m := range deleteCh {
log(ctx).Debugf("[%v] Deleting %v (%v bytes)...\n", workerID, m.BlobID, m.Length)
if err := syncDeleteBlob(ctx, m, dst); err != nil {
return errors.Wrapf(err, "error deleting %v", m.BlobID)
}
}
return nil
})
}
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "error copying blobs")
}
return nil
}
func sliceToChannel(ctx context.Context, md []blob.Metadata) chan blob.Metadata {
ch := make(chan blob.Metadata)
go func() {
defer close(ch)
for _, it := range md {
select {
case ch <- it:
case <-ctx.Done():
return
}
}
}()
return ch
}
var setTimeUnsupportedOnce sync.Once
func syncCopyBlob(ctx context.Context, m blob.Metadata, src, dst blob.Storage) error {
data, err := src.GetBlob(ctx, m.BlobID, 0, -1)
if err != nil {
if errors.Is(err, blob.ErrBlobNotFound) {
log(ctx).Infof("ignoring BLOB not found: %v", m.BlobID)
return nil
}
return errors.Wrapf(err, "error reading blob '%v' from source", m.BlobID)
}
if err := dst.PutBlob(ctx, m.BlobID, gather.FromSlice(data)); err != nil {
return errors.Wrapf(err, "error writing blob '%v' to destination", m.BlobID)
}
if *repositorySyncTimes {
if err := dst.SetTime(ctx, m.BlobID, m.Timestamp); err != nil {
if errors.Is(err, blob.ErrSetTimeUnsupported) {
setTimeUnsupportedOnce.Do(func() {
log(ctx).Warningf("destination repository does not support setting time")
})
}
return errors.Wrapf(err, "error setting time on destination '%v'", m.BlobID)
}
}
return nil
}
func syncDeleteBlob(ctx context.Context, m blob.Metadata, dst blob.Storage) error {
err := dst.DeleteBlob(ctx, m.BlobID)
if errors.Is(err, blob.ErrBlobNotFound) {
return nil
}
return errors.Wrap(err, "error deleting blob")
}
func ensureRepositoriesHaveSameFormatBlob(ctx context.Context, src, dst blob.Storage) error {
srcData, err := src.GetBlob(ctx, repo.FormatBlobID, 0, -1)
if err != nil {
return errors.Wrap(err, "error reading format blob")
}
dstData, err := dst.GetBlob(ctx, repo.FormatBlobID, 0, -1)
if err != nil {
// target does not have format blob, save it there first.
if errors.Is(err, blob.ErrBlobNotFound) {
if *repositorySyncDestinationMustExist {
return errors.Errorf("destination repository does not have a format blob")
}
return dst.PutBlob(ctx, repo.FormatBlobID, gather.FromSlice(srcData))
}
return errors.Wrap(err, "error reading destination repository format blob")
}
if bytes.Equal(srcData, dstData) {
return nil
}
return errors.Errorf("destination repository contains incompatible data")
}