refactor(snapshots): refactored uploader into separate package (#4450)

This commit is contained in:
Jarek Kowalski
2025-03-14 15:48:31 -07:00
committed by GitHub
parent 9b68189d29
commit 51de24dcff
34 changed files with 237 additions and 206 deletions

View File

@@ -13,7 +13,7 @@
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
const (
@@ -30,15 +30,15 @@ type progressFlags struct {
func (p *progressFlags) setup(svc appServices, app *kingpin.Application) {
app.Flag("progress", "Enable progress bar").Hidden().Default("true").BoolVar(&p.enableProgress)
app.Flag("progress-estimation-type", "Set type of estimation of the data to be snapshotted").Hidden().Default(snapshotfs.EstimationTypeClassic).
EnumVar(&p.progressEstimationType, snapshotfs.EstimationTypeClassic, snapshotfs.EstimationTypeRough, snapshotfs.EstimationTypeAdaptive)
app.Flag("progress-estimation-type", "Set type of estimation of the data to be snapshotted").Hidden().Default(upload.EstimationTypeClassic).
EnumVar(&p.progressEstimationType, upload.EstimationTypeClassic, upload.EstimationTypeRough, upload.EstimationTypeAdaptive)
app.Flag("progress-update-interval", "How often to update progress information").Hidden().Default("300ms").DurationVar(&p.progressUpdateInterval)
app.Flag("adaptive-estimation-threshold", "Sets the threshold below which the classic estimation method will be used").Hidden().Default(strconv.FormatInt(snapshotfs.AdaptiveEstimationThreshold, 10)).Int64Var(&p.adaptiveEstimationThreshold)
app.Flag("adaptive-estimation-threshold", "Sets the threshold below which the classic estimation method will be used").Hidden().Default(strconv.FormatInt(upload.AdaptiveEstimationThreshold, 10)).Int64Var(&p.adaptiveEstimationThreshold)
p.out.setup(svc)
}
type cliProgress struct {
snapshotfs.NullUploadProgress
upload.NullUploadProgress
// all int64 must precede all int32 due to alignment requirements on ARM
uploadedBytes atomic.Int64
@@ -270,11 +270,11 @@ func (p *cliProgress) Finish() {
}
}
func (p *cliProgress) EstimationParameters() snapshotfs.EstimationParameters {
return snapshotfs.EstimationParameters{
func (p *cliProgress) EstimationParameters() upload.EstimationParameters {
return upload.EstimationParameters{
Type: p.progressEstimationType,
AdaptiveThreshold: p.adaptiveEstimationThreshold,
}
}
var _ snapshotfs.UploadProgress = (*cliProgress)(nil)
var _ upload.Progress = (*cliProgress)(nil)

View File

@@ -17,7 +17,7 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
const (
@@ -215,8 +215,8 @@ func validateStartEndTime(st, et string) error {
return nil
}
func (c *commandSnapshotCreate) setupUploader(rep repo.RepositoryWriter) *snapshotfs.Uploader {
u := snapshotfs.NewUploader(rep)
func (c *commandSnapshotCreate) setupUploader(rep repo.RepositoryWriter) *upload.Uploader {
u := upload.NewUploader(rep)
u.MaxUploadBytes = c.snapshotCreateCheckpointUploadLimitMB << 20 //nolint:mnd
if c.snapshotCreateForceEnableActions {
@@ -275,7 +275,7 @@ func (c *commandSnapshotCreate) snapshotSingleSource(
fsEntry fs.Entry,
setManual bool,
rep repo.RepositoryWriter,
u *snapshotfs.Uploader,
u *upload.Uploader,
sourceInfo snapshot.SourceInfo,
tags map[string]string,
st *notifydata.MultiSnapshotStatus,

View File

@@ -13,7 +13,7 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
type commandSnapshotEstimate struct {
@@ -39,8 +39,8 @@ func (c *commandSnapshotEstimate) setup(svc appServices, parent commandParent) {
type estimateProgress struct {
stats snapshot.Stats
included snapshotfs.SampleBuckets
excluded snapshotfs.SampleBuckets
included upload.SampleBuckets
excluded upload.SampleBuckets
excludedDirs []string
quiet bool
}
@@ -59,7 +59,7 @@ func (ep *estimateProgress) Error(ctx context.Context, filename string, err erro
}
}
func (ep *estimateProgress) Stats(ctx context.Context, st *snapshot.Stats, included, excluded snapshotfs.SampleBuckets, excludedDirs []string, final bool) {
func (ep *estimateProgress) Stats(ctx context.Context, st *snapshot.Stats, included, excluded upload.SampleBuckets, excludedDirs []string, final bool) {
_ = final
ep.stats = *st
@@ -99,7 +99,7 @@ func (c *commandSnapshotEstimate) run(ctx context.Context, rep repo.Repository)
return errors.Wrapf(err, "error creating policy tree for %v", sourceInfo)
}
if err := snapshotfs.Estimate(ctx, dir, policyTree, &ep, c.maxExamplesPerBucket); err != nil {
if err := upload.Estimate(ctx, dir, policyTree, &ep, c.maxExamplesPerBucket); err != nil {
return errors.Wrap(err, "error estimating")
}
@@ -137,7 +137,7 @@ func (c *commandSnapshotEstimate) run(ctx context.Context, rep repo.Repository)
return nil
}
func (c *commandSnapshotEstimate) showBuckets(buckets snapshotfs.SampleBuckets, showFiles bool) {
func (c *commandSnapshotEstimate) showBuckets(buckets upload.SampleBuckets, showFiles bool) {
for i, bucket := range buckets {
if bucket.Count == 0 {
continue

View File

@@ -12,6 +12,7 @@
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
type commandSnapshotMigrate struct {
@@ -63,7 +64,7 @@ func (c *commandSnapshotMigrate) run(ctx context.Context, destRepo repo.Reposito
wg sync.WaitGroup
mu sync.Mutex
canceled bool
activeUploaders = map[snapshot.SourceInfo]*snapshotfs.Uploader{}
activeUploaders = map[snapshot.SourceInfo]*upload.Uploader{}
)
c.svc.getProgress().StartShared()
@@ -104,7 +105,7 @@ func (c *commandSnapshotMigrate) run(ctx context.Context, destRepo repo.Reposito
break
}
uploader := snapshotfs.NewUploader(destRepo)
uploader := upload.NewUploader(destRepo)
uploader.Progress = c.svc.getProgress()
activeUploaders[s] = uploader
mu.Unlock()
@@ -220,7 +221,7 @@ func (c *commandSnapshotMigrate) findPreviousSnapshotManifestWithStartTime(ctx c
return nil, nil
}
func (c *commandSnapshotMigrate) migrateSingleSource(ctx context.Context, uploader *snapshotfs.Uploader, sourceRepo repo.Repository, destRepo repo.RepositoryWriter, s snapshot.SourceInfo) error {
func (c *commandSnapshotMigrate) migrateSingleSource(ctx context.Context, uploader *upload.Uploader, sourceRepo repo.Repository, destRepo repo.RepositoryWriter, s snapshot.SourceInfo) error {
manifests, err := snapshot.ListSnapshotManifests(ctx, sourceRepo, &s, nil)
if err != nil {
return errors.Wrapf(err, "error listing snapshot manifests for %v", s)
@@ -248,7 +249,7 @@ func (c *commandSnapshotMigrate) migrateSingleSource(ctx context.Context, upload
return nil
}
func (c *commandSnapshotMigrate) migrateSingleSourceSnapshot(ctx context.Context, uploader *snapshotfs.Uploader, sourceRepo repo.Repository, destRepo repo.RepositoryWriter, s snapshot.SourceInfo, m *snapshot.Manifest) error {
func (c *commandSnapshotMigrate) migrateSingleSourceSnapshot(ctx context.Context, uploader *upload.Uploader, sourceRepo repo.Repository, destRepo repo.RepositoryWriter, s snapshot.SourceInfo, m *snapshot.Manifest) error {
if m.IncompleteReason != "" {
log(ctx).Debugf("ignoring incomplete %v at %v", s, formatTimestamp(m.StartTime.ToTime()))
return nil

View File

@@ -13,7 +13,7 @@
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
func TestTimeFuncWiring(t *testing.T) {
@@ -84,7 +84,7 @@ func TestTimeFuncWiring(t *testing.T) {
sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions)
nt = ft.Advance(1 * time.Hour)
u := snapshotfs.NewUploader(env.RepositoryWriter)
u := upload.NewUploader(env.RepositoryWriter)
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
s1, err := u.Upload(ctx, sourceDir, policyTree, snapshot.SourceInfo{})

View File

@@ -17,7 +17,7 @@
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
type estimateTaskProgress struct {
@@ -36,7 +36,7 @@ func (p estimateTaskProgress) Error(ctx context.Context, dirname string, err err
}
}
func (p estimateTaskProgress) Stats(ctx context.Context, st *snapshot.Stats, included, excluded snapshotfs.SampleBuckets, excludedDirs []string, final bool) {
func (p estimateTaskProgress) Stats(ctx context.Context, st *snapshot.Stats, included, excluded upload.SampleBuckets, excludedDirs []string, final bool) {
_ = excludedDirs
_ = final
@@ -57,7 +57,7 @@ func (p estimateTaskProgress) Stats(ctx context.Context, st *snapshot.Stats, inc
}
}
func logBucketSamples(ctx context.Context, buckets snapshotfs.SampleBuckets, prefix string, showExamples bool) {
func logBucketSamples(ctx context.Context, buckets upload.SampleBuckets, prefix string, showExamples bool) {
hasAny := false
for i, bucket := range buckets {
@@ -97,7 +97,7 @@ func logBucketSamples(ctx context.Context, buckets snapshotfs.SampleBuckets, pre
}
}
var _ snapshotfs.EstimateProgress = estimateTaskProgress{}
var _ upload.EstimateProgress = estimateTaskProgress{}
func handleEstimate(ctx context.Context, rc requestContext) (interface{}, *apiError) {
var req serverapi.EstimateRequest
@@ -140,7 +140,7 @@ func handleEstimate(ctx context.Context, rc requestContext) (interface{}, *apiEr
ctrl.OnCancel(cancel)
return snapshotfs.Estimate(estimatectx, dir, policyTree, estimateTaskProgress{ctrl}, req.MaxExamplesPerBucket)
return upload.Estimate(estimatectx, dir, policyTree, estimateTaskProgress{ctrl}, req.MaxExamplesPerBucket)
})
taskID := <-taskIDChan

View File

@@ -21,7 +21,7 @@
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
func TestRestoreSnapshots(t *testing.T) {
@@ -32,7 +32,7 @@ func TestRestoreSnapshots(t *testing.T) {
var id11 manifest.ID
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{Purpose: "Test"}, func(ctx context.Context, w repo.RepositoryWriter) error {
u := snapshotfs.NewUploader(w)
u := upload.NewUploader(w)
dir1 := mockfs.NewDirectory()

View File

@@ -14,7 +14,7 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
func TestListAndDeleteSnapshots(t *testing.T) {
@@ -26,7 +26,7 @@ func TestListAndDeleteSnapshots(t *testing.T) {
var id11, id12, id13, id14, id21 manifest.ID
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{Purpose: "Test"}, func(ctx context.Context, w repo.RepositoryWriter) error {
u := snapshotfs.NewUploader(w)
u := upload.NewUploader(w)
dir1 := mockfs.NewDirectory()
@@ -187,7 +187,7 @@ func TestEditSnapshots(t *testing.T) {
var id11 manifest.ID
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{Purpose: "Test"}, func(ctx context.Context, w repo.RepositoryWriter) error {
u := snapshotfs.NewUploader(w)
u := upload.NewUploader(w)
dir1 := mockfs.NewDirectory()

View File

@@ -17,7 +17,7 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
const (
@@ -40,7 +40,7 @@ type sourceManagerServerInterface interface {
// - FAILED - inactive
// - UPLOADING - uploading a snapshot.
type sourceManager struct {
snapshotfs.NullUploadProgress
upload.NullUploadProgress
server sourceManagerServerInterface
@@ -52,7 +52,7 @@ type sourceManager struct {
sourceMutex sync.RWMutex
// +checklocks:sourceMutex
uploader *snapshotfs.Uploader
uploader *upload.Uploader
// +checklocks:sourceMutex
pol policy.SchedulingPolicy
// +checklocks:sourceMutex
@@ -74,7 +74,7 @@ type sourceManager struct {
// +checklocks:sourceMutex
isReadOnly bool
progress *snapshotfs.CountingUploadProgress
progress *upload.CountingUploadProgress
}
func (s *sourceManager) Status() *serverapi.SourceStatus {
@@ -139,14 +139,14 @@ func (s *sourceManager) setNextSnapshotTime(t time.Time) {
s.nextSnapshotTime = &t
}
func (s *sourceManager) currentUploader() *snapshotfs.Uploader {
func (s *sourceManager) currentUploader() *upload.Uploader {
s.sourceMutex.RLock()
defer s.sourceMutex.RUnlock()
return s.uploader
}
func (s *sourceManager) setUploader(u *snapshotfs.Uploader) {
func (s *sourceManager) setUploader(u *upload.Uploader) {
s.sourceMutex.Lock()
defer s.sourceMutex.Unlock()
@@ -351,7 +351,7 @@ func (s *sourceManager) snapshotInternal(ctx context.Context, ctrl uitask.Contro
}, func(ctx context.Context, w repo.RepositoryWriter) error {
log(ctx).Debugf("uploading %v", s.src)
u := snapshotfs.NewUploader(w)
u := upload.NewUploader(w)
ctrl.OnCancel(u.Cancel)
@@ -475,7 +475,7 @@ func (s *sourceManager) refreshStatus(ctx context.Context) {
type uitaskProgress struct {
nextReportTimeNanos atomic.Int64
p *snapshotfs.CountingUploadProgress
p *upload.CountingUploadProgress
ctrl uitask.Controller
}
@@ -586,7 +586,7 @@ func (t *uitaskProgress) EstimatedDataSize(fileCount, totalBytes int64) {
}
// EstimationParameters returns parameters to be used for estimation.
func (t *uitaskProgress) EstimationParameters() snapshotfs.EstimationParameters {
func (t *uitaskProgress) EstimationParameters() upload.EstimationParameters {
return t.p.EstimationParameters()
}
@@ -598,7 +598,7 @@ func newSourceManager(src snapshot.SourceInfo, server *Server, rep repo.Reposito
state: "UNKNOWN",
closed: make(chan struct{}),
snapshotRequests: make(chan struct{}, 1),
progress: &snapshotfs.CountingUploadProgress{},
progress: &upload.CountingUploadProgress{},
}
return m

View File

@@ -15,7 +15,7 @@
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
// StatusResponse is the response of 'status' HTTP API command.
@@ -52,13 +52,13 @@ type SourcesResponse struct {
// SourceStatus describes the status of a single source.
type SourceStatus struct {
Source snapshot.SourceInfo `json:"source"`
Status string `json:"status"`
SchedulingPolicy policy.SchedulingPolicy `json:"schedule"`
LastSnapshot *snapshot.Manifest `json:"lastSnapshot,omitempty"`
NextSnapshotTime *time.Time `json:"nextSnapshotTime,omitempty"`
UploadCounters *snapshotfs.UploadCounters `json:"upload,omitempty"`
CurrentTask string `json:"currentTask,omitempty"`
Source snapshot.SourceInfo `json:"source"`
Status string `json:"status"`
SchedulingPolicy policy.SchedulingPolicy `json:"schedule"`
LastSnapshot *snapshot.Manifest `json:"lastSnapshot,omitempty"`
NextSnapshotTime *time.Time `json:"nextSnapshotTime,omitempty"`
UploadCounters *upload.Counters `json:"upload,omitempty"`
CurrentTask string `json:"currentTask,omitempty"`
}
// PolicyListEntry describes single policy.

View File

@@ -118,6 +118,10 @@ func (b *DirManifestBuilder) Build(dirModTime fs.UTCTimestamp, incompleteReason
}
}
func isDir(e *snapshot.DirEntry) bool {
return e.Type == snapshot.EntryTypeDirectory
}
func sortedTopFailures(entries []*fs.EntryWithError) []*fs.EntryWithError {
sort.Slice(entries, func(i, j int) bool {
return entries[i].EntryPath < entries[j].EntryPath

View File

@@ -198,7 +198,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s
dm := builder.Build(entry.ModTime, entry.DirSummary.IncompleteReason)
oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, metadataComp)
oid, err := WriteDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, metadataComp)
if err != nil {
return nil, errors.Wrap(err, "unable to write directory manifest")
}

View File

@@ -12,7 +12,8 @@
"github.com/kopia/kopia/snapshot"
)
func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, metadataComp compression.Name) (object.ID, error) {
// WriteDirManifest writes a directory manifest to the repository and returns the object ID.
func WriteDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, metadataComp compression.Name) (object.ID, error) {
writer := rep.NewObjectWriter(ctx, object.WriterOptions{
Description: "DIR:" + dirRelativePath,
Prefix: objectIDPrefixDirectory,

View File

@@ -12,10 +12,13 @@
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
)
var repoFSLog = logging.Module("repofs")
// Well-known object ID prefixes.
const (
objectIDPrefixDirectory = "k"

View File

@@ -9,6 +9,7 @@
"github.com/kopia/kopia/internal/repotesting"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
func TestCalculateStorageStats(t *testing.T) {
@@ -29,7 +30,7 @@ func TestCalculateStorageStats(t *testing.T) {
Path: "/dummy",
}
u := snapshotfs.NewUploader(env.RepositoryWriter)
u := upload.NewUploader(env.RepositoryWriter)
man1, err := u.Upload(ctx, sourceRoot, nil, src)
require.NoError(t, err)
require.NoError(t, env.RepositoryWriter.Flush(ctx))

View File

@@ -14,6 +14,7 @@
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
func TestSnapshotTreeWalker(t *testing.T) {
@@ -46,7 +47,7 @@ func TestSnapshotTreeWalker(t *testing.T) {
// root directory, 2 subdirectories + 2 unique files (dir1/file11 === dir2/file22)
const numUniqueObjects = 5
u := snapshotfs.NewUploader(env.RepositoryWriter)
u := upload.NewUploader(env.RepositoryWriter)
man, err := u.Upload(ctx, sourceRoot, nil, snapshot.SourceInfo{})
require.NoError(t, err)
@@ -110,7 +111,7 @@ func TestSnapshotTreeWalker_Errors(t *testing.T) {
dir2.AddFile("file21", []byte{1, 2, 3, 4}, 0o644)
dir2.AddFile("file22", []byte{1, 2, 3}, 0o644) // same content as dir11/file11
u := snapshotfs.NewUploader(env.RepositoryWriter)
u := upload.NewUploader(env.RepositoryWriter)
man, err := u.Upload(ctx, sourceRoot, nil, snapshot.SourceInfo{})
require.NoError(t, err)
@@ -157,7 +158,7 @@ func TestSnapshotTreeWalker_MultipleErrors(t *testing.T) {
dir2.AddFile("file21", []byte{1, 2, 3, 4}, 0o644)
dir2.AddFile("file22", []byte{1, 2, 3, 4, 5}, 0o644)
u := snapshotfs.NewUploader(env.RepositoryWriter)
u := upload.NewUploader(env.RepositoryWriter)
man, err := u.Upload(ctx, sourceRoot, nil, snapshot.SourceInfo{})
require.NoError(t, err)
@@ -207,7 +208,7 @@ func TestSnapshotTreeWalker_MultipleErrorsSameOID(t *testing.T) {
dir2.AddFile("file21", []byte{1, 2, 3, 4}, 0o644)
dir2.AddFile("file22", []byte{1, 2, 3}, 0o644) // same content as dir11/file11
u := snapshotfs.NewUploader(env.RepositoryWriter)
u := upload.NewUploader(env.RepositoryWriter)
man, err := u.Upload(ctx, sourceRoot, nil, snapshot.SourceInfo{})
require.NoError(t, err)

View File

@@ -14,12 +14,13 @@
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
func TestSnapshotVerifier(t *testing.T) {
ctx, te := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
u := snapshotfs.NewUploader(te.RepositoryWriter)
u := upload.NewUploader(te.RepositoryWriter)
dir1 := mockfs.NewDirectory()
si1 := te.LocalPathSourceInfo("/dummy/path")

View File

@@ -0,0 +1,72 @@
package snapshotfs
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSafeNameForMount(t *testing.T) {
cases := map[string]string{
"/tmp/foo/bar": "tmp_foo_bar",
"/root": "root",
"/root/": "root",
"/": "__root",
"C:": "C",
"C:\\": "C",
"C:\\foo": "C_foo",
"C:\\foo/bar": "C_foo_bar",
"\\\\server\\root": "__server_root",
"\\\\server\\root\\": "__server_root",
"\\\\server\\root\\subdir": "__server_root_subdir",
"\\\\server\\root\\subdir/with/forward/slashes": "__server_root_subdir_with_forward_slashes",
"\\\\server\\root\\subdir/with\\mixed/slashes\\": "__server_root_subdir_with_mixed_slashes",
}
for input, want := range cases {
assert.Equal(t, want, safeNameForMount(input), input)
}
}
func TestDisambiguateSafeNames(t *testing.T) {
cases := []struct {
input map[string]string
want map[string]string
}{
{
input: map[string]string{
"c:/": "c",
"c:\\": "c",
"c:": "c",
"c": "c",
},
want: map[string]string{
"c": "c",
"c:": "c (2)",
"c:/": "c (3)",
"c:\\": "c (4)",
},
},
{
input: map[string]string{
"c:/": "c",
"c:\\": "c",
"c:": "c",
"c": "c",
"c (2)": "c (2)",
},
want: map[string]string{
"c": "c",
"c (2)": "c (2)",
"c:": "c (2) (2)",
"c:/": "c (3)",
"c:\\": "c (4)",
},
},
}
for _, tc := range cases {
require.Equal(t, tc.want, disambiguateSafeNames(tc.input))
}
}

View File

@@ -1,11 +1,10 @@
package snapshotfs
package snapshotfs_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/fs"
@@ -13,12 +12,14 @@
"github.com/kopia/kopia/internal/repotesting"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
func TestAllSources(t *testing.T) {
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
u := NewUploader(env.RepositoryWriter)
u := upload.NewUploader(env.RepositoryWriter)
man, err := u.Upload(ctx, mockfs.NewDirectory(), nil, snapshot.SourceInfo{Host: "dummy", UserName: "dummy", Path: "dummy"})
require.NoError(t, err)
@@ -48,7 +49,7 @@ func TestAllSources(t *testing.T) {
mustWriteSnapshotManifest(ctx, t, env.RepositoryWriter, snapshot.SourceInfo{UserName: m.user, Host: m.host, Path: m.path}, fs.UTCTimestampFromTime(ts), man)
}
as := AllSourcesEntry(env.RepositoryWriter)
as := snapshotfs.AllSourcesEntry(env.RepositoryWriter)
gotNames := iterateAllNames(ctx, t, as, "")
wantNames := map[string]struct{}{
"another-user@some-host/": {},
@@ -111,67 +112,3 @@ func mustWriteSnapshotManifest(ctx context.Context, t *testing.T, rep repo.Repos
_, err := snapshot.SaveSnapshot(ctx, rep, man)
require.NoError(t, err)
}
func TestSafeNameForMount(t *testing.T) {
cases := map[string]string{
"/tmp/foo/bar": "tmp_foo_bar",
"/root": "root",
"/root/": "root",
"/": "__root",
"C:": "C",
"C:\\": "C",
"C:\\foo": "C_foo",
"C:\\foo/bar": "C_foo_bar",
"\\\\server\\root": "__server_root",
"\\\\server\\root\\": "__server_root",
"\\\\server\\root\\subdir": "__server_root_subdir",
"\\\\server\\root\\subdir/with/forward/slashes": "__server_root_subdir_with_forward_slashes",
"\\\\server\\root\\subdir/with\\mixed/slashes\\": "__server_root_subdir_with_mixed_slashes",
}
for input, want := range cases {
assert.Equal(t, want, safeNameForMount(input), input)
}
}
func TestDisambiguateSafeNames(t *testing.T) {
cases := []struct {
input map[string]string
want map[string]string
}{
{
input: map[string]string{
"c:/": "c",
"c:\\": "c",
"c:": "c",
"c": "c",
},
want: map[string]string{
"c": "c",
"c:": "c (2)",
"c:/": "c (3)",
"c:\\": "c (4)",
},
},
{
input: map[string]string{
"c:/": "c",
"c:\\": "c",
"c:": "c",
"c": "c",
"c (2)": "c (2)",
},
want: map[string]string{
"c": "c",
"c (2)": "c (2)",
"c:": "c (2) (2)",
"c:/": "c (3)",
"c:\\": "c (4)",
},
},
}
for _, tc := range cases {
require.Equal(t, tc.want, disambiguateSafeNames(tc.input))
}
}

View File

@@ -10,7 +10,7 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
// Create snapshots an FS entry.
@@ -28,7 +28,7 @@ func createSnapshot(ctx context.Context, rep repo.RepositoryWriter, e fs.Entry,
return nil, err
}
u := snapshotfs.NewUploader(rep)
u := upload.NewUploader(rep)
manifest, err := u.Upload(ctx, e, policyTree, si, previous...)
if err != nil {

View File

@@ -1,4 +1,4 @@
package snapshotfs
package upload
import (
"sync"
@@ -7,6 +7,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
// checkpointFunc is invoked when checkpoint occurs. The callback must checkpoint current state of
@@ -41,7 +42,7 @@ func (r *checkpointRegistry) removeCheckpointCallback(entryName string) {
// runCheckpoints invokes all registered checkpointers and adds results to the provided builder, while
// randomizing file names for non-directory entries. this is to prevent the use of checkpointed objects
// as authoritative on subsequent runs.
func (r *checkpointRegistry) runCheckpoints(checkpointBuilder *DirManifestBuilder) error {
func (r *checkpointRegistry) runCheckpoints(checkpointBuilder *snapshotfs.DirManifestBuilder) error {
r.mu.Lock()
defer r.mu.Unlock()

View File

@@ -1,4 +1,4 @@
package snapshotfs
package upload
import (
"os"
@@ -9,6 +9,7 @@
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/mockfs"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
func TestCheckpointRegistry(t *testing.T) {
@@ -54,7 +55,7 @@ func TestCheckpointRegistry(t *testing.T) {
cp.removeCheckpointCallback(f3.Name())
cp.removeCheckpointCallback(f3.Name())
var dmb DirManifestBuilder
var dmb snapshotfs.DirManifestBuilder
dmb.AddEntry(&snapshot.DirEntry{
Name: "pre-existing",

View File

@@ -1,4 +1,4 @@
package snapshotfs
package upload
import (
"context"

View File

@@ -1,4 +1,4 @@
package snapshotfs_test
package upload_test
import (
"context"
@@ -13,7 +13,7 @@
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
)
type fakeProgress struct {
@@ -31,7 +31,7 @@ func (p *fakeProgress) Error(context.Context, string, error, bool) {}
func (p *fakeProgress) Stats(
ctx context.Context,
s *snapshot.Stats,
includedFiles, excludedFiles snapshotfs.SampleBuckets,
includedFiles, excludedFiles upload.SampleBuckets,
excludedDirs []string,
final bool,
) {
@@ -62,6 +62,6 @@ func TestEstimate_SkipsStreamingDirectory(t *testing.T) {
expectedErrors: 0,
}
err := snapshotfs.Estimate(testlogging.Context(t), rootDir, policyTree, p, 1)
err := upload.Estimate(testlogging.Context(t), rootDir, policyTree, p, 1)
require.NoError(t, err)
}

View File

@@ -1,4 +1,5 @@
package snapshotfs
// Package upload manages snapshot uploads.
package upload
import (
"bytes"
@@ -30,15 +31,14 @@
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
// DefaultCheckpointInterval is the default frequency of mid-upload checkpointing.
const DefaultCheckpointInterval = 45 * time.Minute
var (
uploadLog = logging.Module("uploader")
repoFSLog = logging.Module("repofs")
uploadLog = logging.Module("uploader")
uploadTracer = otel.Tracer("upload")
)
@@ -64,7 +64,7 @@
type Uploader struct {
totalWrittenBytes atomic.Int64
Progress UploadProgress
Progress Progress
// automatically cancel the Upload after certain number of bytes
MaxUploadBytes int64
@@ -509,7 +509,7 @@ func (u *Uploader) uploadFileWithCheckpointing(ctx context.Context, relativePath
// checkpointRoot invokes checkpoints on the provided registry and if a checkpoint entry was generated,
// saves it in an incomplete snapshot manifest.
func (u *Uploader) checkpointRoot(ctx context.Context, cp *checkpointRegistry, prototypeManifest *snapshot.Manifest) error {
var dmbCheckpoint DirManifestBuilder
var dmbCheckpoint snapshotfs.DirManifestBuilder
if err := cp.runCheckpoints(&dmbCheckpoint); err != nil {
return errors.Wrap(err, "running checkpointers")
}
@@ -586,7 +586,7 @@ func (u *Uploader) periodicallyCheckpoint(ctx context.Context, cp *checkpointReg
// uploadDirWithCheckpointing uploads the specified Directory to the repository.
func (u *Uploader) uploadDirWithCheckpointing(ctx context.Context, rootDir fs.Directory, policyTree *policy.Tree, previousDirs []fs.Directory, sourceInfo snapshot.SourceInfo) (*snapshot.DirEntry, error) {
var (
dmb DirManifestBuilder
dmb snapshotfs.DirManifestBuilder
cp checkpointRegistry
)
@@ -654,14 +654,10 @@ func rootCauseError(err error) error {
return err
}
func isDir(e *snapshot.DirEntry) bool {
return e.Type == snapshot.EntryTypeDirectory
}
func (u *Uploader) processChildren(
ctx context.Context,
parentDirCheckpointRegistry *checkpointRegistry,
parentDirBuilder *DirManifestBuilder,
parentDirBuilder *snapshotfs.DirManifestBuilder,
localDirPathOrEmpty, relativePath string,
dir fs.Directory,
policyTree *policy.Tree,
@@ -786,7 +782,7 @@ func (u *Uploader) effectiveParallelFileReads(pol *policy.Policy) int {
func (u *Uploader) processDirectoryEntries(
ctx context.Context,
parentCheckpointRegistry *checkpointRegistry,
parentDirBuilder *DirManifestBuilder,
parentDirBuilder *snapshotfs.DirManifestBuilder,
localDirPathOrEmpty string,
dirRelativePath string,
dir fs.Directory,
@@ -837,7 +833,7 @@ func (u *Uploader) processSingle(
ctx context.Context,
entry fs.Entry,
entryRelativePath string,
parentDirBuilder *DirManifestBuilder,
parentDirBuilder *snapshotfs.DirManifestBuilder,
policyTree *policy.Tree,
prevDirs []fs.Directory,
localDirPathOrEmpty string,
@@ -872,7 +868,7 @@ func (u *Uploader) processSingle(
switch entry := entry.(type) {
case fs.Directory:
childDirBuilder := &DirManifestBuilder{}
childDirBuilder := &snapshotfs.DirManifestBuilder{}
childLocalDirPathOrEmpty := ""
if localDirPathOrEmpty != "" {
@@ -958,7 +954,17 @@ func (u *Uploader) processSingle(
}
//nolint:unparam
func (u *Uploader) processEntryUploadResult(ctx context.Context, de *snapshot.DirEntry, err error, entryRelativePath string, parentDirBuilder *DirManifestBuilder, isIgnored bool, logDetail policy.LogDetail, logMessage string, t0 timetrack.Timer) error {
func (u *Uploader) processEntryUploadResult(
ctx context.Context,
de *snapshot.DirEntry,
err error,
entryRelativePath string,
parentDirBuilder *snapshotfs.DirManifestBuilder,
isIgnored bool,
logDetail policy.LogDetail,
logMessage string,
t0 timetrack.Timer,
) error {
if err != nil {
u.reportErrorAndMaybeCancel(err, isIgnored, parentDirBuilder, entryRelativePath)
} else {
@@ -1098,7 +1104,7 @@ func uploadDirInternal(
policyTree *policy.Tree,
previousDirs []fs.Directory,
localDirPathOrEmpty, dirRelativePath string,
thisDirBuilder *DirManifestBuilder,
thisDirBuilder *snapshotfs.DirManifestBuilder,
thisCheckpointRegistry *checkpointRegistry,
) (resultDE *snapshot.DirEntry, resultErr error) {
atomic.AddInt32(&u.stats.TotalDirectoryCount, 1)
@@ -1162,7 +1168,7 @@ func uploadDirInternal(
checkpointManifest := thisCheckpointBuilder.Build(fs.UTCTimestampFromTime(directory.ModTime()), IncompleteReasonCheckpoint)
oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, checkpointManifest, metadataComp)
oid, err := snapshotfs.WriteDirManifest(ctx, u.repo, dirRelativePath, checkpointManifest, metadataComp)
if err != nil {
return nil, errors.Wrap(err, "error writing dir manifest")
}
@@ -1177,7 +1183,7 @@ func uploadDirInternal(
dirManifest := thisDirBuilder.Build(fs.UTCTimestampFromTime(directory.ModTime()), u.incompleteReason())
oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, dirManifest, metadataComp)
oid, err := snapshotfs.WriteDirManifest(ctx, u.repo, dirRelativePath, dirManifest, metadataComp)
if err != nil {
return nil, errors.Wrapf(err, "error writing dir manifest: %v", directory.Name())
}
@@ -1185,7 +1191,7 @@ func uploadDirInternal(
return newDirEntryWithSummary(directory, oid, dirManifest.Summary)
}
func (u *Uploader) reportErrorAndMaybeCancel(err error, isIgnored bool, dmb *DirManifestBuilder, entryRelativePath string) {
func (u *Uploader) reportErrorAndMaybeCancel(err error, isIgnored bool, dmb *snapshotfs.DirManifestBuilder, entryRelativePath string) {
if u.IsCanceled() && errors.Is(err, errCanceled) {
// already canceled, do not report another.
return
@@ -1227,7 +1233,7 @@ func (u *Uploader) maybeOpenDirectoryFromManifest(ctx context.Context, man *snap
return nil
}
ent := EntryFromDirEntry(u.repo, man.RootEntry)
ent := snapshotfs.EntryFromDirEntry(u.repo, man.RootEntry)
dir, ok := ent.(fs.Directory)
if !ok {

View File

@@ -1,4 +1,4 @@
package snapshotfs
package upload
import (
"bufio"

View File

@@ -1,4 +1,4 @@
package snapshotfs
package upload
import (
"context"

View File

@@ -1,4 +1,4 @@
package snapshotfs_test
package upload_test
import (
"context"
@@ -10,7 +10,7 @@
vsi "github.com/kopia/kopia/internal/volumesizeinfo"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@@ -56,15 +56,15 @@ func getMockLogger() logging.Logger {
// withFailedVolumeSizeInfo returns EstimatorOption which ensures that GetVolumeSizeInfo will fail with provided error.
// Purposed for tests.
func withFailedVolumeSizeInfo(err error) snapshotfs.EstimatorOption {
return snapshotfs.WithVolumeSizeInfoFn(func(_ string) (vsi.VolumeSizeInfo, error) {
func withFailedVolumeSizeInfo(err error) upload.EstimatorOption {
return upload.WithVolumeSizeInfoFn(func(_ string) (vsi.VolumeSizeInfo, error) {
return vsi.VolumeSizeInfo{}, err
})
}
// withVolumeSizeInfo returns EstimatorOption which provides fake volume size.
func withVolumeSizeInfo(filesCount, usedFileSize, totalFileSize uint64) snapshotfs.EstimatorOption {
return snapshotfs.WithVolumeSizeInfoFn(func(_ string) (vsi.VolumeSizeInfo, error) {
func withVolumeSizeInfo(filesCount, usedFileSize, totalFileSize uint64) upload.EstimatorOption {
return upload.WithVolumeSizeInfoFn(func(_ string) (vsi.VolumeSizeInfo, error) {
return vsi.VolumeSizeInfo{
TotalSize: totalFileSize,
UsedSize: usedFileSize,
@@ -76,7 +76,7 @@ func withVolumeSizeInfo(filesCount, usedFileSize, totalFileSize uint64) snapshot
func expectSuccessfulEstimation(
ctx context.Context,
t *testing.T,
estimator snapshotfs.Estimator,
estimator upload.Estimator,
expectedNumberOfFiles,
expectedDataSize int64,
) {
@@ -121,7 +121,7 @@ func TestUploadEstimator(t *testing.T) {
logger := getMockLogger()
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
estimator := snapshotfs.NewEstimator(dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeClassic}, logger)
estimator := upload.NewEstimator(dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeClassic}, logger)
estimationCtx := context.Background()
expectSuccessfulEstimation(estimationCtx, t, estimator, expectedNumberOfFiles, expectedDataSize)
@@ -133,8 +133,8 @@ func TestUploadEstimator(t *testing.T) {
expectedDataSize := int64(2000)
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
estimator := snapshotfs.NewEstimator(
dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeRough}, logger,
estimator := upload.NewEstimator(
dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeRough}, logger,
withVolumeSizeInfo(uint64(expectedNumberOfFiles), uint64(expectedDataSize), 3000))
estimationCtx := context.Background()
@@ -145,8 +145,8 @@ func TestUploadEstimator(t *testing.T) {
logger := getMockLogger()
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
estimator := snapshotfs.NewEstimator(
dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeRough}, logger,
estimator := upload.NewEstimator(
dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeRough}, logger,
withFailedVolumeSizeInfo(errSimulated))
estimationCtx := context.Background()
@@ -162,9 +162,9 @@ func TestUploadEstimator(t *testing.T) {
expectedDataSize := int64(2000)
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
estimator := snapshotfs.NewEstimator(
estimator := upload.NewEstimator(
dir1, policyTree,
snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeAdaptive, AdaptiveThreshold: 100}, logger,
upload.EstimationParameters{Type: upload.EstimationTypeAdaptive, AdaptiveThreshold: 100}, logger,
withVolumeSizeInfo(uint64(expectedNumberOfFiles), uint64(expectedDataSize), 3000))
estimationCtx := context.Background()
@@ -175,9 +175,9 @@ func TestUploadEstimator(t *testing.T) {
logger := getMockLogger()
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
estimator := snapshotfs.NewEstimator(
estimator := upload.NewEstimator(
dir1, policyTree,
snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeAdaptive, AdaptiveThreshold: 10000}, logger,
upload.EstimationParameters{Type: upload.EstimationTypeAdaptive, AdaptiveThreshold: 10000}, logger,
withVolumeSizeInfo(uint64(1000), uint64(2000), 3000))
estimationCtx := context.Background()
@@ -188,8 +188,8 @@ func TestUploadEstimator(t *testing.T) {
logger := getMockLogger()
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
estimator := snapshotfs.NewEstimator(
dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeAdaptive, AdaptiveThreshold: 1}, logger,
estimator := upload.NewEstimator(
dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeAdaptive, AdaptiveThreshold: 1}, logger,
withFailedVolumeSizeInfo(errSimulated))
estimationCtx := context.Background()
@@ -214,7 +214,7 @@ func TestUploadEstimator(t *testing.T) {
logger := getMockLogger()
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
estimator := snapshotfs.NewEstimator(dir2, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeRough}, logger)
estimator := upload.NewEstimator(dir2, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeRough}, logger)
// In case of canceled context, we should get zeroes instead of estimated numbers
expectSuccessfulEstimation(testCtx, t, estimator, 0, 0)
@@ -229,7 +229,7 @@ func TestUploadEstimator(t *testing.T) {
logger := getMockLogger()
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
estimator := snapshotfs.NewEstimator(dir2, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeClassic}, logger)
estimator := upload.NewEstimator(dir2, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeClassic}, logger)
dir2.Subdir("d1").OnReaddir(func() {
estimator.Cancel()
@@ -248,7 +248,7 @@ func TestUploadEstimator(t *testing.T) {
}, policy.DefaultPolicy)
logger := getMockLogger()
estimator := snapshotfs.NewEstimator(dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeClassic}, logger)
estimator := upload.NewEstimator(dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeClassic}, logger)
expectSuccessfulEstimation(context.Background(), t, estimator, expectedNumberOfFiles-1, expectedDataSize-int64(len(file1Content)))
})

View File

@@ -1,7 +1,7 @@
//go:build !windows
// +build !windows
package snapshotfs
package upload
import (
"context"

View File

@@ -1,4 +1,4 @@
package snapshotfs
package upload
import (
"context"

View File

@@ -1,4 +1,4 @@
package snapshotfs
package upload
import (
"sync"
@@ -26,10 +26,10 @@ type EstimationParameters struct {
AdaptiveThreshold int64
}
// UploadProgress is invoked by uploader to report status of file and directory uploads.
// Progress is invoked by uploader to report status of file and directory uploads.
//
//nolint:interfacebloat
type UploadProgress interface {
type Progress interface {
// Enabled returns true when progress is enabled, false otherwise.
Enabled() bool
@@ -163,10 +163,10 @@ func (p *NullUploadProgress) EstimationParameters() EstimationParameters {
}
}
var _ UploadProgress = (*NullUploadProgress)(nil)
var _ Progress = (*NullUploadProgress)(nil)
// UploadCounters represents a snapshot of upload counters.
type UploadCounters struct {
// Counters represents a snapshot of upload counters.
type Counters struct {
// +checkatomic
TotalCachedBytes int64 `json:"cachedBytes"`
// +checkatomic
@@ -206,13 +206,13 @@ type CountingUploadProgress struct {
mu sync.Mutex
counters UploadCounters
counters Counters
}
// UploadStarted implements UploadProgress.
func (p *CountingUploadProgress) UploadStarted() {
// reset counters to all-zero values.
p.counters = UploadCounters{}
p.counters = Counters{}
}
// UploadedBytes implements UploadProgress.
@@ -289,11 +289,11 @@ func (p *CountingUploadProgress) StartedDirectory(dirname string) {
}
// Snapshot captures current snapshot of the upload.
func (p *CountingUploadProgress) Snapshot() UploadCounters {
func (p *CountingUploadProgress) Snapshot() Counters {
p.mu.Lock()
defer p.mu.Unlock()
return UploadCounters{
return Counters{
TotalCachedFiles: atomic.LoadInt32(&p.counters.TotalCachedFiles),
TotalHashedFiles: atomic.LoadInt32(&p.counters.TotalHashedFiles),
TotalCachedBytes: atomic.LoadInt64(&p.counters.TotalCachedBytes),
@@ -342,4 +342,4 @@ func (p *CountingUploadProgress) UITaskCounters(final bool) map[string]uitask.Co
return m
}
var _ UploadProgress = (*CountingUploadProgress)(nil)
var _ Progress = (*CountingUploadProgress)(nil)

View File

@@ -1,4 +1,4 @@
package snapshotfs
package upload
import (
"context"

View File

@@ -1,4 +1,4 @@
package snapshotfs
package upload
import (
"bytes"
@@ -44,6 +44,7 @@
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
const (
@@ -284,7 +285,7 @@ func TestUploadMetadataCompression(t *testing.T) {
t.Errorf("Upload error: %v", err)
}
dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
entries := findAllEntries(t, ctx, dir)
verifyMetadataCompressor(t, ctx, th.repo, entries, compression.HeaderZstdFastest)
})
@@ -305,7 +306,7 @@ func TestUploadMetadataCompression(t *testing.T) {
t.Errorf("Upload error: %v", err)
}
dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
entries := findAllEntries(t, ctx, dir)
verifyMetadataCompressor(t, ctx, th.repo, entries, content.NoCompression)
})
@@ -326,7 +327,7 @@ func TestUploadMetadataCompression(t *testing.T) {
t.Errorf("Upload error: %v", err)
}
dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
entries := findAllEntries(t, ctx, dir)
verifyMetadataCompressor(t, ctx, th.repo, entries, compression.ByName["gzip"].HeaderID())
})
@@ -628,12 +629,12 @@ func TestUpload_SubDirectoryReadFailureSomeIgnoredNoFailFast(t *testing.T) {
}
type mockProgress struct {
UploadProgress
Progress
finishedFileCheck func(string, error)
}
func (mp *mockProgress) FinishedFile(relativePath string, err error) {
defer mp.UploadProgress.FinishedFile(relativePath, err)
defer mp.Progress.FinishedFile(relativePath, err)
mp.finishedFileCheck(relativePath, err)
}
@@ -657,7 +658,7 @@ func TestUpload_FinishedFileProgress(t *testing.T) {
u := NewUploader(th.repo)
u.ForceHashPercentage = 0
u.Progress = &mockProgress{
UploadProgress: u.Progress,
Progress: u.Progress,
finishedFileCheck: func(relativePath string, err error) {
defer func() {
mu.Lock()
@@ -1280,7 +1281,7 @@ func TestParallelUploadOfLargeFiles(t *testing.T) {
t.Logf("man: %v", man.RootObjectID())
dir := EntryFromDirEntry(th.repo, man.RootEntry).(fs.Directory)
dir := snapshotfs.EntryFromDirEntry(th.repo, man.RootEntry).(fs.Directory)
successCount := 0

View File

@@ -26,6 +26,7 @@
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/upload"
"github.com/kopia/kopia/tests/robustness"
)
@@ -112,7 +113,7 @@ func (kc *KopiaClient) SnapshotCreate(ctx context.Context, key string, val []byt
}
source := kc.getSourceForKeyVal(key, val)
u := snapshotfs.NewUploader(rw)
u := upload.NewUploader(rw)
man, err := u.Upload(ctx, source, policyTree, si)
if err != nil {