mirror of
https://github.com/kopia/kopia.git
synced 2026-01-25 23:08:01 -05:00
Removed Warning, Notify and Fatal: * `Warning` => `Error` or `Info` * `Notify` => `Info` * `Fatal` was never used. Note that --log-level=warning is still supported for backwards compatibility, but it is the same as --log-level=error. Co-authored-by: Julio López <julio+gh@kasten.io>
320 lines
8.6 KiB
Go
320 lines
8.6 KiB
Go
package cli
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/kopia/kopia/internal/gather"
|
|
"github.com/kopia/kopia/internal/stats"
|
|
"github.com/kopia/kopia/internal/timetrack"
|
|
"github.com/kopia/kopia/internal/units"
|
|
"github.com/kopia/kopia/repo"
|
|
"github.com/kopia/kopia/repo/blob"
|
|
)
|
|
|
|
var (
|
|
repositorySyncCommand = repositoryCommands.Command("sync-to", "Synchronizes contents of this repository to another location")
|
|
repositorySyncUpdate = repositorySyncCommand.Flag("update", "Whether to update blobs present in destination and source if the source is newer.").Default("true").Bool()
|
|
repositorySyncDelete = repositorySyncCommand.Flag("delete", "Whether to delete blobs present in destination but not source.").Bool()
|
|
repositorySyncDryRun = repositorySyncCommand.Flag("dry-run", "Do not perform copying.").Short('n').Bool()
|
|
repositorySyncParallelism = repositorySyncCommand.Flag("parallel", "Copy parallelism.").Default("1").Int()
|
|
repositorySyncDestinationMustExist = repositorySyncCommand.Flag("must-exist", "Fail if destination does not have repository format blob.").Bool()
|
|
repositorySyncTimes = repositorySyncCommand.Flag("times", "Synchronize blob times if supported.").Bool()
|
|
)
|
|
|
|
const syncProgressInterval = 300 * time.Millisecond
|
|
|
|
func runSyncWithStorage(ctx context.Context, src blob.Reader, dst blob.Storage) error {
|
|
var ()
|
|
|
|
log(ctx).Infof("Synchronizing repositories:")
|
|
log(ctx).Infof(" Source: %v", src.DisplayName())
|
|
log(ctx).Infof(" Destination: %v", dst.DisplayName())
|
|
|
|
if !*repositorySyncDelete {
|
|
log(ctx).Infof("NOTE: By default no BLOBs are deleted, pass --delete to allow it.")
|
|
}
|
|
|
|
if err := ensureRepositoriesHaveSameFormatBlob(ctx, src, dst); err != nil {
|
|
return err
|
|
}
|
|
|
|
log(ctx).Infof("Looking for BLOBs to synchronize...")
|
|
|
|
var (
|
|
inSyncBlobs int
|
|
inSyncBytes int64
|
|
|
|
blobsToCopy []blob.Metadata
|
|
totalCopyBytes int64
|
|
|
|
blobsToDelete []blob.Metadata
|
|
totalDeleteBytes int64
|
|
|
|
srcBlobs int
|
|
totalSrcSize int64
|
|
)
|
|
|
|
dstMetadata, err := listDestinationBlobs(ctx, dst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
beginSyncProgress()
|
|
|
|
if err := src.ListBlobs(ctx, "", func(srcmd blob.Metadata) error {
|
|
totalSrcSize += srcmd.Length
|
|
|
|
dstmd, exists := dstMetadata[srcmd.BlobID]
|
|
delete(dstMetadata, srcmd.BlobID)
|
|
|
|
switch {
|
|
case !exists:
|
|
blobsToCopy = append(blobsToCopy, srcmd)
|
|
totalCopyBytes += srcmd.Length
|
|
case srcmd.Timestamp.After(dstmd.Timestamp) && *repositorySyncUpdate:
|
|
blobsToCopy = append(blobsToCopy, srcmd)
|
|
totalCopyBytes += srcmd.Length
|
|
default:
|
|
inSyncBlobs++
|
|
inSyncBytes += srcmd.Length
|
|
}
|
|
|
|
srcBlobs++
|
|
outputSyncProgress(fmt.Sprintf(" Found %v BLOBs (%v) in the source repository, %v (%v) to copy", srcBlobs, units.BytesStringBase10(totalSrcSize), len(blobsToCopy), units.BytesStringBase10(totalCopyBytes)))
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return errors.Wrap(err, "error listing blobs")
|
|
}
|
|
|
|
finishSyncProcess()
|
|
|
|
if *repositorySyncDelete {
|
|
for _, dstmd := range dstMetadata {
|
|
// found in dst, not in src since we were deleting from dst as we found a match.
|
|
blobsToDelete = append(blobsToDelete, dstmd)
|
|
totalDeleteBytes += dstmd.Length
|
|
}
|
|
}
|
|
|
|
log(ctx).Infof(
|
|
" Found %v BLOBs to delete (%v), %v in sync (%v)",
|
|
len(blobsToDelete), units.BytesStringBase10(totalDeleteBytes),
|
|
inSyncBlobs, units.BytesStringBase10(inSyncBytes),
|
|
)
|
|
|
|
if *repositorySyncDryRun {
|
|
return nil
|
|
}
|
|
|
|
log(ctx).Infof("Copying...")
|
|
|
|
beginSyncProgress()
|
|
|
|
finalErr := runSyncBlobs(ctx, src, dst, blobsToCopy, blobsToDelete, totalCopyBytes)
|
|
|
|
finishSyncProcess()
|
|
|
|
return finalErr
|
|
}
|
|
|
|
func listDestinationBlobs(ctx context.Context, dst blob.Storage) (map[blob.ID]blob.Metadata, error) {
|
|
dstTotalBytes := int64(0)
|
|
dstMetadata := map[blob.ID]blob.Metadata{}
|
|
|
|
beginSyncProgress()
|
|
|
|
if err := dst.ListBlobs(ctx, "", func(bm blob.Metadata) error {
|
|
dstMetadata[bm.BlobID] = bm
|
|
dstTotalBytes += bm.Length
|
|
outputSyncProgress(fmt.Sprintf(" Found %v BLOBs in the destination repository (%v)", len(dstMetadata), units.BytesStringBase10(dstTotalBytes)))
|
|
return nil
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "error listing BLOBs in destination repository")
|
|
}
|
|
|
|
finishSyncProcess()
|
|
|
|
return dstMetadata, nil
|
|
}
|
|
|
|
var (
|
|
lastSyncProgress string
|
|
syncProgressMutex sync.Mutex
|
|
nextSyncOutputTime timetrack.Throttle
|
|
)
|
|
|
|
func beginSyncProgress() {
|
|
lastSyncProgress = ""
|
|
|
|
nextSyncOutputTime.Reset()
|
|
}
|
|
|
|
func outputSyncProgress(s string) {
|
|
syncProgressMutex.Lock()
|
|
defer syncProgressMutex.Unlock()
|
|
|
|
if len(s) < len(lastSyncProgress) {
|
|
s += strings.Repeat(" ", len(lastSyncProgress)-len(s))
|
|
}
|
|
|
|
if nextSyncOutputTime.ShouldOutput(syncProgressInterval) {
|
|
printStderr("\r%v", s)
|
|
}
|
|
|
|
lastSyncProgress = s
|
|
}
|
|
|
|
func finishSyncProcess() {
|
|
printStderr("\r%v\n", lastSyncProgress)
|
|
}
|
|
|
|
func runSyncBlobs(ctx context.Context, src blob.Reader, dst blob.Storage, blobsToCopy, blobsToDelete []blob.Metadata, totalBytes int64) error {
|
|
eg, ctx := errgroup.WithContext(ctx)
|
|
copyCh := sliceToChannel(ctx, blobsToCopy)
|
|
deleteCh := sliceToChannel(ctx, blobsToDelete)
|
|
|
|
var progressMutex sync.Mutex
|
|
|
|
var totalCopied stats.CountSum
|
|
|
|
tt := timetrack.Start()
|
|
|
|
for i := 0; i < *repositorySyncParallelism; i++ {
|
|
workerID := i
|
|
|
|
eg.Go(func() error {
|
|
for m := range copyCh {
|
|
log(ctx).Debugf("[%v] Copying %v (%v bytes)...\n", workerID, m.BlobID, m.Length)
|
|
if err := syncCopyBlob(ctx, m, src, dst); err != nil {
|
|
return errors.Wrapf(err, "error copying %v", m.BlobID)
|
|
}
|
|
|
|
numBlobs, bytesCopied := totalCopied.Add(m.Length)
|
|
progressMutex.Lock()
|
|
eta := "unknown"
|
|
speed := "-"
|
|
|
|
if est, ok := tt.Estimate(float64(bytesCopied), float64(totalBytes)); ok {
|
|
eta = fmt.Sprintf("%v (%v)", est.Remaining, formatTimestamp(est.EstimatedEndTime))
|
|
speed = units.BitsPerSecondsString(est.SpeedPerSecond * 8) //nolint:gomnd
|
|
}
|
|
|
|
outputSyncProgress(
|
|
fmt.Sprintf(" Copied %v blobs (%v), Speed: %v, ETA: %v",
|
|
numBlobs, units.BytesStringBase10(bytesCopied), speed, eta))
|
|
progressMutex.Unlock()
|
|
}
|
|
|
|
for m := range deleteCh {
|
|
log(ctx).Debugf("[%v] Deleting %v (%v bytes)...\n", workerID, m.BlobID, m.Length)
|
|
if err := syncDeleteBlob(ctx, m, dst); err != nil {
|
|
return errors.Wrapf(err, "error deleting %v", m.BlobID)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if err := eg.Wait(); err != nil {
|
|
return errors.Wrap(err, "error copying blobs")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func sliceToChannel(ctx context.Context, md []blob.Metadata) chan blob.Metadata {
|
|
ch := make(chan blob.Metadata)
|
|
|
|
go func() {
|
|
defer close(ch)
|
|
|
|
for _, it := range md {
|
|
select {
|
|
case ch <- it:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
var setTimeUnsupportedOnce sync.Once
|
|
|
|
func syncCopyBlob(ctx context.Context, m blob.Metadata, src blob.Reader, dst blob.Storage) error {
|
|
data, err := src.GetBlob(ctx, m.BlobID, 0, -1)
|
|
if err != nil {
|
|
if errors.Is(err, blob.ErrBlobNotFound) {
|
|
log(ctx).Infof("ignoring BLOB not found: %v", m.BlobID)
|
|
return nil
|
|
}
|
|
|
|
return errors.Wrapf(err, "error reading blob '%v' from source", m.BlobID)
|
|
}
|
|
|
|
if err := dst.PutBlob(ctx, m.BlobID, gather.FromSlice(data)); err != nil {
|
|
return errors.Wrapf(err, "error writing blob '%v' to destination", m.BlobID)
|
|
}
|
|
|
|
if *repositorySyncTimes {
|
|
if err := dst.SetTime(ctx, m.BlobID, m.Timestamp); err != nil {
|
|
if errors.Is(err, blob.ErrSetTimeUnsupported) {
|
|
setTimeUnsupportedOnce.Do(func() {
|
|
log(ctx).Infof("destination repository does not support setting time")
|
|
})
|
|
}
|
|
|
|
return errors.Wrapf(err, "error setting time on destination '%v'", m.BlobID)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func syncDeleteBlob(ctx context.Context, m blob.Metadata, dst blob.Storage) error {
|
|
err := dst.DeleteBlob(ctx, m.BlobID)
|
|
|
|
if errors.Is(err, blob.ErrBlobNotFound) {
|
|
return nil
|
|
}
|
|
|
|
return errors.Wrap(err, "error deleting blob")
|
|
}
|
|
|
|
func ensureRepositoriesHaveSameFormatBlob(ctx context.Context, src blob.Reader, dst blob.Storage) error {
|
|
srcData, err := src.GetBlob(ctx, repo.FormatBlobID, 0, -1)
|
|
if err != nil {
|
|
return errors.Wrap(err, "error reading format blob")
|
|
}
|
|
|
|
dstData, err := dst.GetBlob(ctx, repo.FormatBlobID, 0, -1)
|
|
if err != nil {
|
|
// target does not have format blob, save it there first.
|
|
if errors.Is(err, blob.ErrBlobNotFound) {
|
|
if *repositorySyncDestinationMustExist {
|
|
return errors.Errorf("destination repository does not have a format blob")
|
|
}
|
|
|
|
return dst.PutBlob(ctx, repo.FormatBlobID, gather.FromSlice(srcData))
|
|
}
|
|
|
|
return errors.Wrap(err, "error reading destination repository format blob")
|
|
}
|
|
|
|
if bytes.Equal(srcData, dstData) {
|
|
return nil
|
|
}
|
|
|
|
return errors.Errorf("destination repository contains incompatible data")
|
|
}
|