From 51de24dcff72e50dbf51004543ef36fcb728f8ad Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Fri, 14 Mar 2025 15:48:31 -0700 Subject: [PATCH] refactor(snapshots): refactored uploader into separate package (#4450) --- cli/cli_progress.go | 16 ++-- cli/command_snapshot_create.go | 8 +- cli/command_snapshot_estimate.go | 12 +-- cli/command_snapshot_migrate.go | 9 ++- internal/repotesting/repotesting_test.go | 4 +- internal/server/api_estimate.go | 10 +-- internal/server/api_restore_test.go | 4 +- internal/server/api_snapshots_test.go | 6 +- internal/server/source_manager.go | 20 ++--- internal/serverapi/serverapi.go | 16 ++-- snapshot/snapshotfs/dir_manifest_builder.go | 4 + snapshot/snapshotfs/dir_rewriter.go | 2 +- snapshot/snapshotfs/dir_writer.go | 3 +- snapshot/snapshotfs/repofs.go | 3 + .../snapshotfs/snapshot_storage_stats_test.go | 3 +- .../snapshotfs/snapshot_tree_walker_test.go | 9 ++- snapshot/snapshotfs/snapshot_verifier_test.go | 3 +- .../source_directories_internal_test.go | 72 ++++++++++++++++++ .../snapshotfs/source_directories_test.go | 73 ++----------------- snapshot/snapshotmaintenance/helper_test.go | 4 +- .../checkpoint_registry.go | 5 +- .../checkpoint_registry_test.go | 5 +- snapshot/{snapshotfs => upload}/estimate.go | 2 +- .../{snapshotfs => upload}/estimate_test.go | 8 +- snapshot/{snapshotfs => upload}/upload.go | 48 ++++++------ .../{snapshotfs => upload}/upload_actions.go | 2 +- .../upload_estimator.go | 2 +- .../upload_estimator_test.go | 42 +++++------ .../upload_os_snapshot_nonwindows.go | 2 +- .../upload_os_snapshot_windows.go | 2 +- .../{snapshotfs => upload}/upload_progress.go | 22 +++--- .../{snapshotfs => upload}/upload_scan.go | 2 +- .../{snapshotfs => upload}/upload_test.go | 17 +++-- tests/tools/kopiaclient/kopiaclient.go | 3 +- 34 files changed, 237 insertions(+), 206 deletions(-) create mode 100644 snapshot/snapshotfs/source_directories_internal_test.go rename snapshot/{snapshotfs => upload}/checkpoint_registry.go (89%) rename snapshot/{snapshotfs => upload}/checkpoint_registry_test.go (95%) rename snapshot/{snapshotfs => upload}/estimate.go (99%) rename snapshot/{snapshotfs => upload}/estimate_test.go (87%) rename snapshot/{snapshotfs => upload}/upload.go (97%) rename snapshot/{snapshotfs => upload}/upload_actions.go (99%) rename snapshot/{snapshotfs => upload}/upload_estimator.go (99%) rename snapshot/{snapshotfs => upload}/upload_estimator_test.go (80%) rename snapshot/{snapshotfs => upload}/upload_os_snapshot_nonwindows.go (96%) rename snapshot/{snapshotfs => upload}/upload_os_snapshot_windows.go (99%) rename snapshot/{snapshotfs => upload}/upload_progress.go (95%) rename snapshot/{snapshotfs => upload}/upload_scan.go (97%) rename snapshot/{snapshotfs => upload}/upload_test.go (99%) diff --git a/cli/cli_progress.go b/cli/cli_progress.go index 7ff6904d3..0992e1cb9 100644 --- a/cli/cli_progress.go +++ b/cli/cli_progress.go @@ -13,7 +13,7 @@ "github.com/kopia/kopia/internal/timetrack" "github.com/kopia/kopia/internal/units" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) const ( @@ -30,15 +30,15 @@ type progressFlags struct { func (p *progressFlags) setup(svc appServices, app *kingpin.Application) { app.Flag("progress", "Enable progress bar").Hidden().Default("true").BoolVar(&p.enableProgress) - app.Flag("progress-estimation-type", "Set type of estimation of the data to be snapshotted").Hidden().Default(snapshotfs.EstimationTypeClassic). - EnumVar(&p.progressEstimationType, snapshotfs.EstimationTypeClassic, snapshotfs.EstimationTypeRough, snapshotfs.EstimationTypeAdaptive) + app.Flag("progress-estimation-type", "Set type of estimation of the data to be snapshotted").Hidden().Default(upload.EstimationTypeClassic). + EnumVar(&p.progressEstimationType, upload.EstimationTypeClassic, upload.EstimationTypeRough, upload.EstimationTypeAdaptive) app.Flag("progress-update-interval", "How often to update progress information").Hidden().Default("300ms").DurationVar(&p.progressUpdateInterval) - app.Flag("adaptive-estimation-threshold", "Sets the threshold below which the classic estimation method will be used").Hidden().Default(strconv.FormatInt(snapshotfs.AdaptiveEstimationThreshold, 10)).Int64Var(&p.adaptiveEstimationThreshold) + app.Flag("adaptive-estimation-threshold", "Sets the threshold below which the classic estimation method will be used").Hidden().Default(strconv.FormatInt(upload.AdaptiveEstimationThreshold, 10)).Int64Var(&p.adaptiveEstimationThreshold) p.out.setup(svc) } type cliProgress struct { - snapshotfs.NullUploadProgress + upload.NullUploadProgress // all int64 must precede all int32 due to alignment requirements on ARM uploadedBytes atomic.Int64 @@ -270,11 +270,11 @@ func (p *cliProgress) Finish() { } } -func (p *cliProgress) EstimationParameters() snapshotfs.EstimationParameters { - return snapshotfs.EstimationParameters{ +func (p *cliProgress) EstimationParameters() upload.EstimationParameters { + return upload.EstimationParameters{ Type: p.progressEstimationType, AdaptiveThreshold: p.adaptiveEstimationThreshold, } } -var _ snapshotfs.UploadProgress = (*cliProgress)(nil) +var _ upload.Progress = (*cliProgress)(nil) diff --git a/cli/command_snapshot_create.go b/cli/command_snapshot_create.go index 5828684dd..3fb87f53d 100644 --- a/cli/command_snapshot_create.go +++ b/cli/command_snapshot_create.go @@ -17,7 +17,7 @@ "github.com/kopia/kopia/repo" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) const ( @@ -215,8 +215,8 @@ func validateStartEndTime(st, et string) error { return nil } -func (c *commandSnapshotCreate) setupUploader(rep repo.RepositoryWriter) *snapshotfs.Uploader { - u := snapshotfs.NewUploader(rep) +func (c *commandSnapshotCreate) setupUploader(rep repo.RepositoryWriter) *upload.Uploader { + u := upload.NewUploader(rep) u.MaxUploadBytes = c.snapshotCreateCheckpointUploadLimitMB << 20 //nolint:mnd if c.snapshotCreateForceEnableActions { @@ -275,7 +275,7 @@ func (c *commandSnapshotCreate) snapshotSingleSource( fsEntry fs.Entry, setManual bool, rep repo.RepositoryWriter, - u *snapshotfs.Uploader, + u *upload.Uploader, sourceInfo snapshot.SourceInfo, tags map[string]string, st *notifydata.MultiSnapshotStatus, diff --git a/cli/command_snapshot_estimate.go b/cli/command_snapshot_estimate.go index 7c0b4f794..7be3fac93 100644 --- a/cli/command_snapshot_estimate.go +++ b/cli/command_snapshot_estimate.go @@ -13,7 +13,7 @@ "github.com/kopia/kopia/repo" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) type commandSnapshotEstimate struct { @@ -39,8 +39,8 @@ func (c *commandSnapshotEstimate) setup(svc appServices, parent commandParent) { type estimateProgress struct { stats snapshot.Stats - included snapshotfs.SampleBuckets - excluded snapshotfs.SampleBuckets + included upload.SampleBuckets + excluded upload.SampleBuckets excludedDirs []string quiet bool } @@ -59,7 +59,7 @@ func (ep *estimateProgress) Error(ctx context.Context, filename string, err erro } } -func (ep *estimateProgress) Stats(ctx context.Context, st *snapshot.Stats, included, excluded snapshotfs.SampleBuckets, excludedDirs []string, final bool) { +func (ep *estimateProgress) Stats(ctx context.Context, st *snapshot.Stats, included, excluded upload.SampleBuckets, excludedDirs []string, final bool) { _ = final ep.stats = *st @@ -99,7 +99,7 @@ func (c *commandSnapshotEstimate) run(ctx context.Context, rep repo.Repository) return errors.Wrapf(err, "error creating policy tree for %v", sourceInfo) } - if err := snapshotfs.Estimate(ctx, dir, policyTree, &ep, c.maxExamplesPerBucket); err != nil { + if err := upload.Estimate(ctx, dir, policyTree, &ep, c.maxExamplesPerBucket); err != nil { return errors.Wrap(err, "error estimating") } @@ -137,7 +137,7 @@ func (c *commandSnapshotEstimate) run(ctx context.Context, rep repo.Repository) return nil } -func (c *commandSnapshotEstimate) showBuckets(buckets snapshotfs.SampleBuckets, showFiles bool) { +func (c *commandSnapshotEstimate) showBuckets(buckets upload.SampleBuckets, showFiles bool) { for i, bucket := range buckets { if bucket.Count == 0 { continue diff --git a/cli/command_snapshot_migrate.go b/cli/command_snapshot_migrate.go index d44c455d9..eb04586bb 100644 --- a/cli/command_snapshot_migrate.go +++ b/cli/command_snapshot_migrate.go @@ -12,6 +12,7 @@ "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) type commandSnapshotMigrate struct { @@ -63,7 +64,7 @@ func (c *commandSnapshotMigrate) run(ctx context.Context, destRepo repo.Reposito wg sync.WaitGroup mu sync.Mutex canceled bool - activeUploaders = map[snapshot.SourceInfo]*snapshotfs.Uploader{} + activeUploaders = map[snapshot.SourceInfo]*upload.Uploader{} ) c.svc.getProgress().StartShared() @@ -104,7 +105,7 @@ func (c *commandSnapshotMigrate) run(ctx context.Context, destRepo repo.Reposito break } - uploader := snapshotfs.NewUploader(destRepo) + uploader := upload.NewUploader(destRepo) uploader.Progress = c.svc.getProgress() activeUploaders[s] = uploader mu.Unlock() @@ -220,7 +221,7 @@ func (c *commandSnapshotMigrate) findPreviousSnapshotManifestWithStartTime(ctx c return nil, nil } -func (c *commandSnapshotMigrate) migrateSingleSource(ctx context.Context, uploader *snapshotfs.Uploader, sourceRepo repo.Repository, destRepo repo.RepositoryWriter, s snapshot.SourceInfo) error { +func (c *commandSnapshotMigrate) migrateSingleSource(ctx context.Context, uploader *upload.Uploader, sourceRepo repo.Repository, destRepo repo.RepositoryWriter, s snapshot.SourceInfo) error { manifests, err := snapshot.ListSnapshotManifests(ctx, sourceRepo, &s, nil) if err != nil { return errors.Wrapf(err, "error listing snapshot manifests for %v", s) @@ -248,7 +249,7 @@ func (c *commandSnapshotMigrate) migrateSingleSource(ctx context.Context, upload return nil } -func (c *commandSnapshotMigrate) migrateSingleSourceSnapshot(ctx context.Context, uploader *snapshotfs.Uploader, sourceRepo repo.Repository, destRepo repo.RepositoryWriter, s snapshot.SourceInfo, m *snapshot.Manifest) error { +func (c *commandSnapshotMigrate) migrateSingleSourceSnapshot(ctx context.Context, uploader *upload.Uploader, sourceRepo repo.Repository, destRepo repo.RepositoryWriter, s snapshot.SourceInfo, m *snapshot.Manifest) error { if m.IncompleteReason != "" { log(ctx).Debugf("ignoring incomplete %v at %v", s, formatTimestamp(m.StartTime.ToTime())) return nil diff --git a/internal/repotesting/repotesting_test.go b/internal/repotesting/repotesting_test.go index 355bc9dd1..e4b6fc224 100644 --- a/internal/repotesting/repotesting_test.go +++ b/internal/repotesting/repotesting_test.go @@ -13,7 +13,7 @@ "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) func TestTimeFuncWiring(t *testing.T) { @@ -84,7 +84,7 @@ func TestTimeFuncWiring(t *testing.T) { sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions) nt = ft.Advance(1 * time.Hour) - u := snapshotfs.NewUploader(env.RepositoryWriter) + u := upload.NewUploader(env.RepositoryWriter) policyTree := policy.BuildTree(nil, policy.DefaultPolicy) s1, err := u.Upload(ctx, sourceDir, policyTree, snapshot.SourceInfo{}) diff --git a/internal/server/api_estimate.go b/internal/server/api_estimate.go index bdd0339e9..2705638ca 100644 --- a/internal/server/api_estimate.go +++ b/internal/server/api_estimate.go @@ -17,7 +17,7 @@ "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) type estimateTaskProgress struct { @@ -36,7 +36,7 @@ func (p estimateTaskProgress) Error(ctx context.Context, dirname string, err err } } -func (p estimateTaskProgress) Stats(ctx context.Context, st *snapshot.Stats, included, excluded snapshotfs.SampleBuckets, excludedDirs []string, final bool) { +func (p estimateTaskProgress) Stats(ctx context.Context, st *snapshot.Stats, included, excluded upload.SampleBuckets, excludedDirs []string, final bool) { _ = excludedDirs _ = final @@ -57,7 +57,7 @@ func (p estimateTaskProgress) Stats(ctx context.Context, st *snapshot.Stats, inc } } -func logBucketSamples(ctx context.Context, buckets snapshotfs.SampleBuckets, prefix string, showExamples bool) { +func logBucketSamples(ctx context.Context, buckets upload.SampleBuckets, prefix string, showExamples bool) { hasAny := false for i, bucket := range buckets { @@ -97,7 +97,7 @@ func logBucketSamples(ctx context.Context, buckets snapshotfs.SampleBuckets, pre } } -var _ snapshotfs.EstimateProgress = estimateTaskProgress{} +var _ upload.EstimateProgress = estimateTaskProgress{} func handleEstimate(ctx context.Context, rc requestContext) (interface{}, *apiError) { var req serverapi.EstimateRequest @@ -140,7 +140,7 @@ func handleEstimate(ctx context.Context, rc requestContext) (interface{}, *apiEr ctrl.OnCancel(cancel) - return snapshotfs.Estimate(estimatectx, dir, policyTree, estimateTaskProgress{ctrl}, req.MaxExamplesPerBucket) + return upload.Estimate(estimatectx, dir, policyTree, estimateTaskProgress{ctrl}, req.MaxExamplesPerBucket) }) taskID := <-taskIDChan diff --git a/internal/server/api_restore_test.go b/internal/server/api_restore_test.go index 2a1c01f37..96f8bf9b1 100644 --- a/internal/server/api_restore_test.go +++ b/internal/server/api_restore_test.go @@ -21,7 +21,7 @@ "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/restore" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) func TestRestoreSnapshots(t *testing.T) { @@ -32,7 +32,7 @@ func TestRestoreSnapshots(t *testing.T) { var id11 manifest.ID require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{Purpose: "Test"}, func(ctx context.Context, w repo.RepositoryWriter) error { - u := snapshotfs.NewUploader(w) + u := upload.NewUploader(w) dir1 := mockfs.NewDirectory() diff --git a/internal/server/api_snapshots_test.go b/internal/server/api_snapshots_test.go index 914ac031d..00977a129 100644 --- a/internal/server/api_snapshots_test.go +++ b/internal/server/api_snapshots_test.go @@ -14,7 +14,7 @@ "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) func TestListAndDeleteSnapshots(t *testing.T) { @@ -26,7 +26,7 @@ func TestListAndDeleteSnapshots(t *testing.T) { var id11, id12, id13, id14, id21 manifest.ID require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{Purpose: "Test"}, func(ctx context.Context, w repo.RepositoryWriter) error { - u := snapshotfs.NewUploader(w) + u := upload.NewUploader(w) dir1 := mockfs.NewDirectory() @@ -187,7 +187,7 @@ func TestEditSnapshots(t *testing.T) { var id11 manifest.ID require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{Purpose: "Test"}, func(ctx context.Context, w repo.RepositoryWriter) error { - u := snapshotfs.NewUploader(w) + u := upload.NewUploader(w) dir1 := mockfs.NewDirectory() diff --git a/internal/server/source_manager.go b/internal/server/source_manager.go index e50c09e50..38f4364f6 100644 --- a/internal/server/source_manager.go +++ b/internal/server/source_manager.go @@ -17,7 +17,7 @@ "github.com/kopia/kopia/repo" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) const ( @@ -40,7 +40,7 @@ type sourceManagerServerInterface interface { // - FAILED - inactive // - UPLOADING - uploading a snapshot. type sourceManager struct { - snapshotfs.NullUploadProgress + upload.NullUploadProgress server sourceManagerServerInterface @@ -52,7 +52,7 @@ type sourceManager struct { sourceMutex sync.RWMutex // +checklocks:sourceMutex - uploader *snapshotfs.Uploader + uploader *upload.Uploader // +checklocks:sourceMutex pol policy.SchedulingPolicy // +checklocks:sourceMutex @@ -74,7 +74,7 @@ type sourceManager struct { // +checklocks:sourceMutex isReadOnly bool - progress *snapshotfs.CountingUploadProgress + progress *upload.CountingUploadProgress } func (s *sourceManager) Status() *serverapi.SourceStatus { @@ -139,14 +139,14 @@ func (s *sourceManager) setNextSnapshotTime(t time.Time) { s.nextSnapshotTime = &t } -func (s *sourceManager) currentUploader() *snapshotfs.Uploader { +func (s *sourceManager) currentUploader() *upload.Uploader { s.sourceMutex.RLock() defer s.sourceMutex.RUnlock() return s.uploader } -func (s *sourceManager) setUploader(u *snapshotfs.Uploader) { +func (s *sourceManager) setUploader(u *upload.Uploader) { s.sourceMutex.Lock() defer s.sourceMutex.Unlock() @@ -351,7 +351,7 @@ func (s *sourceManager) snapshotInternal(ctx context.Context, ctrl uitask.Contro }, func(ctx context.Context, w repo.RepositoryWriter) error { log(ctx).Debugf("uploading %v", s.src) - u := snapshotfs.NewUploader(w) + u := upload.NewUploader(w) ctrl.OnCancel(u.Cancel) @@ -475,7 +475,7 @@ func (s *sourceManager) refreshStatus(ctx context.Context) { type uitaskProgress struct { nextReportTimeNanos atomic.Int64 - p *snapshotfs.CountingUploadProgress + p *upload.CountingUploadProgress ctrl uitask.Controller } @@ -586,7 +586,7 @@ func (t *uitaskProgress) EstimatedDataSize(fileCount, totalBytes int64) { } // EstimationParameters returns parameters to be used for estimation. -func (t *uitaskProgress) EstimationParameters() snapshotfs.EstimationParameters { +func (t *uitaskProgress) EstimationParameters() upload.EstimationParameters { return t.p.EstimationParameters() } @@ -598,7 +598,7 @@ func newSourceManager(src snapshot.SourceInfo, server *Server, rep repo.Reposito state: "UNKNOWN", closed: make(chan struct{}), snapshotRequests: make(chan struct{}, 1), - progress: &snapshotfs.CountingUploadProgress{}, + progress: &upload.CountingUploadProgress{}, } return m diff --git a/internal/serverapi/serverapi.go b/internal/serverapi/serverapi.go index 5c1c7a81e..d0adda342 100644 --- a/internal/serverapi/serverapi.go +++ b/internal/serverapi/serverapi.go @@ -15,7 +15,7 @@ "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" "github.com/kopia/kopia/snapshot/restore" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) // StatusResponse is the response of 'status' HTTP API command. @@ -52,13 +52,13 @@ type SourcesResponse struct { // SourceStatus describes the status of a single source. type SourceStatus struct { - Source snapshot.SourceInfo `json:"source"` - Status string `json:"status"` - SchedulingPolicy policy.SchedulingPolicy `json:"schedule"` - LastSnapshot *snapshot.Manifest `json:"lastSnapshot,omitempty"` - NextSnapshotTime *time.Time `json:"nextSnapshotTime,omitempty"` - UploadCounters *snapshotfs.UploadCounters `json:"upload,omitempty"` - CurrentTask string `json:"currentTask,omitempty"` + Source snapshot.SourceInfo `json:"source"` + Status string `json:"status"` + SchedulingPolicy policy.SchedulingPolicy `json:"schedule"` + LastSnapshot *snapshot.Manifest `json:"lastSnapshot,omitempty"` + NextSnapshotTime *time.Time `json:"nextSnapshotTime,omitempty"` + UploadCounters *upload.Counters `json:"upload,omitempty"` + CurrentTask string `json:"currentTask,omitempty"` } // PolicyListEntry describes single policy. diff --git a/snapshot/snapshotfs/dir_manifest_builder.go b/snapshot/snapshotfs/dir_manifest_builder.go index 875e0c25c..d89a6a820 100644 --- a/snapshot/snapshotfs/dir_manifest_builder.go +++ b/snapshot/snapshotfs/dir_manifest_builder.go @@ -118,6 +118,10 @@ func (b *DirManifestBuilder) Build(dirModTime fs.UTCTimestamp, incompleteReason } } +func isDir(e *snapshot.DirEntry) bool { + return e.Type == snapshot.EntryTypeDirectory +} + func sortedTopFailures(entries []*fs.EntryWithError) []*fs.EntryWithError { sort.Slice(entries, func(i, j int) bool { return entries[i].EntryPath < entries[j].EntryPath diff --git a/snapshot/snapshotfs/dir_rewriter.go b/snapshot/snapshotfs/dir_rewriter.go index ab41d463f..d550ac2fa 100644 --- a/snapshot/snapshotfs/dir_rewriter.go +++ b/snapshot/snapshotfs/dir_rewriter.go @@ -198,7 +198,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s dm := builder.Build(entry.ModTime, entry.DirSummary.IncompleteReason) - oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, metadataComp) + oid, err := WriteDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, metadataComp) if err != nil { return nil, errors.Wrap(err, "unable to write directory manifest") } diff --git a/snapshot/snapshotfs/dir_writer.go b/snapshot/snapshotfs/dir_writer.go index 39556547a..1b8970628 100644 --- a/snapshot/snapshotfs/dir_writer.go +++ b/snapshot/snapshotfs/dir_writer.go @@ -12,7 +12,8 @@ "github.com/kopia/kopia/snapshot" ) -func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, metadataComp compression.Name) (object.ID, error) { +// WriteDirManifest writes a directory manifest to the repository and returns the object ID. +func WriteDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, metadataComp compression.Name) (object.ID, error) { writer := rep.NewObjectWriter(ctx, object.WriterOptions{ Description: "DIR:" + dirRelativePath, Prefix: objectIDPrefixDirectory, diff --git a/snapshot/snapshotfs/repofs.go b/snapshot/snapshotfs/repofs.go index cb99eaa65..53dda798c 100644 --- a/snapshot/snapshotfs/repofs.go +++ b/snapshot/snapshotfs/repofs.go @@ -12,10 +12,13 @@ "github.com/kopia/kopia/fs" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" ) +var repoFSLog = logging.Module("repofs") + // Well-known object ID prefixes. const ( objectIDPrefixDirectory = "k" diff --git a/snapshot/snapshotfs/snapshot_storage_stats_test.go b/snapshot/snapshotfs/snapshot_storage_stats_test.go index 41aa98f34..67ae44ab5 100644 --- a/snapshot/snapshotfs/snapshot_storage_stats_test.go +++ b/snapshot/snapshotfs/snapshot_storage_stats_test.go @@ -9,6 +9,7 @@ "github.com/kopia/kopia/internal/repotesting" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) func TestCalculateStorageStats(t *testing.T) { @@ -29,7 +30,7 @@ func TestCalculateStorageStats(t *testing.T) { Path: "/dummy", } - u := snapshotfs.NewUploader(env.RepositoryWriter) + u := upload.NewUploader(env.RepositoryWriter) man1, err := u.Upload(ctx, sourceRoot, nil, src) require.NoError(t, err) require.NoError(t, env.RepositoryWriter.Flush(ctx)) diff --git a/snapshot/snapshotfs/snapshot_tree_walker_test.go b/snapshot/snapshotfs/snapshot_tree_walker_test.go index e6951eb82..cec5be72e 100644 --- a/snapshot/snapshotfs/snapshot_tree_walker_test.go +++ b/snapshot/snapshotfs/snapshot_tree_walker_test.go @@ -14,6 +14,7 @@ "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) func TestSnapshotTreeWalker(t *testing.T) { @@ -46,7 +47,7 @@ func TestSnapshotTreeWalker(t *testing.T) { // root directory, 2 subdirectories + 2 unique files (dir1/file11 === dir2/file22) const numUniqueObjects = 5 - u := snapshotfs.NewUploader(env.RepositoryWriter) + u := upload.NewUploader(env.RepositoryWriter) man, err := u.Upload(ctx, sourceRoot, nil, snapshot.SourceInfo{}) require.NoError(t, err) @@ -110,7 +111,7 @@ func TestSnapshotTreeWalker_Errors(t *testing.T) { dir2.AddFile("file21", []byte{1, 2, 3, 4}, 0o644) dir2.AddFile("file22", []byte{1, 2, 3}, 0o644) // same content as dir11/file11 - u := snapshotfs.NewUploader(env.RepositoryWriter) + u := upload.NewUploader(env.RepositoryWriter) man, err := u.Upload(ctx, sourceRoot, nil, snapshot.SourceInfo{}) require.NoError(t, err) @@ -157,7 +158,7 @@ func TestSnapshotTreeWalker_MultipleErrors(t *testing.T) { dir2.AddFile("file21", []byte{1, 2, 3, 4}, 0o644) dir2.AddFile("file22", []byte{1, 2, 3, 4, 5}, 0o644) - u := snapshotfs.NewUploader(env.RepositoryWriter) + u := upload.NewUploader(env.RepositoryWriter) man, err := u.Upload(ctx, sourceRoot, nil, snapshot.SourceInfo{}) require.NoError(t, err) @@ -207,7 +208,7 @@ func TestSnapshotTreeWalker_MultipleErrorsSameOID(t *testing.T) { dir2.AddFile("file21", []byte{1, 2, 3, 4}, 0o644) dir2.AddFile("file22", []byte{1, 2, 3}, 0o644) // same content as dir11/file11 - u := snapshotfs.NewUploader(env.RepositoryWriter) + u := upload.NewUploader(env.RepositoryWriter) man, err := u.Upload(ctx, sourceRoot, nil, snapshot.SourceInfo{}) require.NoError(t, err) diff --git a/snapshot/snapshotfs/snapshot_verifier_test.go b/snapshot/snapshotfs/snapshot_verifier_test.go index 4b60cf5f1..5febd4371 100644 --- a/snapshot/snapshotfs/snapshot_verifier_test.go +++ b/snapshot/snapshotfs/snapshot_verifier_test.go @@ -14,12 +14,13 @@ "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) func TestSnapshotVerifier(t *testing.T) { ctx, te := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) - u := snapshotfs.NewUploader(te.RepositoryWriter) + u := upload.NewUploader(te.RepositoryWriter) dir1 := mockfs.NewDirectory() si1 := te.LocalPathSourceInfo("/dummy/path") diff --git a/snapshot/snapshotfs/source_directories_internal_test.go b/snapshot/snapshotfs/source_directories_internal_test.go new file mode 100644 index 000000000..36962cb1f --- /dev/null +++ b/snapshot/snapshotfs/source_directories_internal_test.go @@ -0,0 +1,72 @@ +package snapshotfs + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSafeNameForMount(t *testing.T) { + cases := map[string]string{ + "/tmp/foo/bar": "tmp_foo_bar", + "/root": "root", + "/root/": "root", + "/": "__root", + "C:": "C", + "C:\\": "C", + "C:\\foo": "C_foo", + "C:\\foo/bar": "C_foo_bar", + "\\\\server\\root": "__server_root", + "\\\\server\\root\\": "__server_root", + "\\\\server\\root\\subdir": "__server_root_subdir", + "\\\\server\\root\\subdir/with/forward/slashes": "__server_root_subdir_with_forward_slashes", + "\\\\server\\root\\subdir/with\\mixed/slashes\\": "__server_root_subdir_with_mixed_slashes", + } + + for input, want := range cases { + assert.Equal(t, want, safeNameForMount(input), input) + } +} + +func TestDisambiguateSafeNames(t *testing.T) { + cases := []struct { + input map[string]string + want map[string]string + }{ + { + input: map[string]string{ + "c:/": "c", + "c:\\": "c", + "c:": "c", + "c": "c", + }, + want: map[string]string{ + "c": "c", + "c:": "c (2)", + "c:/": "c (3)", + "c:\\": "c (4)", + }, + }, + { + input: map[string]string{ + "c:/": "c", + "c:\\": "c", + "c:": "c", + "c": "c", + "c (2)": "c (2)", + }, + want: map[string]string{ + "c": "c", + "c (2)": "c (2)", + "c:": "c (2) (2)", + "c:/": "c (3)", + "c:\\": "c (4)", + }, + }, + } + + for _, tc := range cases { + require.Equal(t, tc.want, disambiguateSafeNames(tc.input)) + } +} diff --git a/snapshot/snapshotfs/source_directories_test.go b/snapshot/snapshotfs/source_directories_test.go index 70410fc85..44efccb07 100644 --- a/snapshot/snapshotfs/source_directories_test.go +++ b/snapshot/snapshotfs/source_directories_test.go @@ -1,11 +1,10 @@ -package snapshotfs +package snapshotfs_test import ( "context" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/kopia/kopia/fs" @@ -13,12 +12,14 @@ "github.com/kopia/kopia/internal/repotesting" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) func TestAllSources(t *testing.T) { ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) - u := NewUploader(env.RepositoryWriter) + u := upload.NewUploader(env.RepositoryWriter) man, err := u.Upload(ctx, mockfs.NewDirectory(), nil, snapshot.SourceInfo{Host: "dummy", UserName: "dummy", Path: "dummy"}) require.NoError(t, err) @@ -48,7 +49,7 @@ func TestAllSources(t *testing.T) { mustWriteSnapshotManifest(ctx, t, env.RepositoryWriter, snapshot.SourceInfo{UserName: m.user, Host: m.host, Path: m.path}, fs.UTCTimestampFromTime(ts), man) } - as := AllSourcesEntry(env.RepositoryWriter) + as := snapshotfs.AllSourcesEntry(env.RepositoryWriter) gotNames := iterateAllNames(ctx, t, as, "") wantNames := map[string]struct{}{ "another-user@some-host/": {}, @@ -111,67 +112,3 @@ func mustWriteSnapshotManifest(ctx context.Context, t *testing.T, rep repo.Repos _, err := snapshot.SaveSnapshot(ctx, rep, man) require.NoError(t, err) } - -func TestSafeNameForMount(t *testing.T) { - cases := map[string]string{ - "/tmp/foo/bar": "tmp_foo_bar", - "/root": "root", - "/root/": "root", - "/": "__root", - "C:": "C", - "C:\\": "C", - "C:\\foo": "C_foo", - "C:\\foo/bar": "C_foo_bar", - "\\\\server\\root": "__server_root", - "\\\\server\\root\\": "__server_root", - "\\\\server\\root\\subdir": "__server_root_subdir", - "\\\\server\\root\\subdir/with/forward/slashes": "__server_root_subdir_with_forward_slashes", - "\\\\server\\root\\subdir/with\\mixed/slashes\\": "__server_root_subdir_with_mixed_slashes", - } - - for input, want := range cases { - assert.Equal(t, want, safeNameForMount(input), input) - } -} - -func TestDisambiguateSafeNames(t *testing.T) { - cases := []struct { - input map[string]string - want map[string]string - }{ - { - input: map[string]string{ - "c:/": "c", - "c:\\": "c", - "c:": "c", - "c": "c", - }, - want: map[string]string{ - "c": "c", - "c:": "c (2)", - "c:/": "c (3)", - "c:\\": "c (4)", - }, - }, - { - input: map[string]string{ - "c:/": "c", - "c:\\": "c", - "c:": "c", - "c": "c", - "c (2)": "c (2)", - }, - want: map[string]string{ - "c": "c", - "c (2)": "c (2)", - "c:": "c (2) (2)", - "c:/": "c (3)", - "c:\\": "c (4)", - }, - }, - } - - for _, tc := range cases { - require.Equal(t, tc.want, disambiguateSafeNames(tc.input)) - } -} diff --git a/snapshot/snapshotmaintenance/helper_test.go b/snapshot/snapshotmaintenance/helper_test.go index 0ff0e0978..0cb4a19a5 100644 --- a/snapshot/snapshotmaintenance/helper_test.go +++ b/snapshot/snapshotmaintenance/helper_test.go @@ -10,7 +10,7 @@ "github.com/kopia/kopia/repo" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) // Create snapshots an FS entry. @@ -28,7 +28,7 @@ func createSnapshot(ctx context.Context, rep repo.RepositoryWriter, e fs.Entry, return nil, err } - u := snapshotfs.NewUploader(rep) + u := upload.NewUploader(rep) manifest, err := u.Upload(ctx, e, policyTree, si, previous...) if err != nil { diff --git a/snapshot/snapshotfs/checkpoint_registry.go b/snapshot/upload/checkpoint_registry.go similarity index 89% rename from snapshot/snapshotfs/checkpoint_registry.go rename to snapshot/upload/checkpoint_registry.go index 653842b5e..b84a53ada 100644 --- a/snapshot/snapshotfs/checkpoint_registry.go +++ b/snapshot/upload/checkpoint_registry.go @@ -1,4 +1,4 @@ -package snapshotfs +package upload import ( "sync" @@ -7,6 +7,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/snapshotfs" ) // checkpointFunc is invoked when checkpoint occurs. The callback must checkpoint current state of @@ -41,7 +42,7 @@ func (r *checkpointRegistry) removeCheckpointCallback(entryName string) { // runCheckpoints invokes all registered checkpointers and adds results to the provided builder, while // randomizing file names for non-directory entries. this is to prevent the use of checkpointed objects // as authoritative on subsequent runs. -func (r *checkpointRegistry) runCheckpoints(checkpointBuilder *DirManifestBuilder) error { +func (r *checkpointRegistry) runCheckpoints(checkpointBuilder *snapshotfs.DirManifestBuilder) error { r.mu.Lock() defer r.mu.Unlock() diff --git a/snapshot/snapshotfs/checkpoint_registry_test.go b/snapshot/upload/checkpoint_registry_test.go similarity index 95% rename from snapshot/snapshotfs/checkpoint_registry_test.go rename to snapshot/upload/checkpoint_registry_test.go index 819eb8505..6492fe1d5 100644 --- a/snapshot/snapshotfs/checkpoint_registry_test.go +++ b/snapshot/upload/checkpoint_registry_test.go @@ -1,4 +1,4 @@ -package snapshotfs +package upload import ( "os" @@ -9,6 +9,7 @@ "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/mockfs" "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/snapshotfs" ) func TestCheckpointRegistry(t *testing.T) { @@ -54,7 +55,7 @@ func TestCheckpointRegistry(t *testing.T) { cp.removeCheckpointCallback(f3.Name()) cp.removeCheckpointCallback(f3.Name()) - var dmb DirManifestBuilder + var dmb snapshotfs.DirManifestBuilder dmb.AddEntry(&snapshot.DirEntry{ Name: "pre-existing", diff --git a/snapshot/snapshotfs/estimate.go b/snapshot/upload/estimate.go similarity index 99% rename from snapshot/snapshotfs/estimate.go rename to snapshot/upload/estimate.go index 4950b1594..38c83bb8e 100644 --- a/snapshot/snapshotfs/estimate.go +++ b/snapshot/upload/estimate.go @@ -1,4 +1,4 @@ -package snapshotfs +package upload import ( "context" diff --git a/snapshot/snapshotfs/estimate_test.go b/snapshot/upload/estimate_test.go similarity index 87% rename from snapshot/snapshotfs/estimate_test.go rename to snapshot/upload/estimate_test.go index d6e95e405..c7302dfff 100644 --- a/snapshot/snapshotfs/estimate_test.go +++ b/snapshot/upload/estimate_test.go @@ -1,4 +1,4 @@ -package snapshotfs_test +package upload_test import ( "context" @@ -13,7 +13,7 @@ "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" ) type fakeProgress struct { @@ -31,7 +31,7 @@ func (p *fakeProgress) Error(context.Context, string, error, bool) {} func (p *fakeProgress) Stats( ctx context.Context, s *snapshot.Stats, - includedFiles, excludedFiles snapshotfs.SampleBuckets, + includedFiles, excludedFiles upload.SampleBuckets, excludedDirs []string, final bool, ) { @@ -62,6 +62,6 @@ func TestEstimate_SkipsStreamingDirectory(t *testing.T) { expectedErrors: 0, } - err := snapshotfs.Estimate(testlogging.Context(t), rootDir, policyTree, p, 1) + err := upload.Estimate(testlogging.Context(t), rootDir, policyTree, p, 1) require.NoError(t, err) } diff --git a/snapshot/snapshotfs/upload.go b/snapshot/upload/upload.go similarity index 97% rename from snapshot/snapshotfs/upload.go rename to snapshot/upload/upload.go index 87528864a..ed39557ff 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/upload/upload.go @@ -1,4 +1,5 @@ -package snapshotfs +// Package upload manages snapshot uploads. +package upload import ( "bytes" @@ -30,15 +31,14 @@ "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" + "github.com/kopia/kopia/snapshot/snapshotfs" ) // DefaultCheckpointInterval is the default frequency of mid-upload checkpointing. const DefaultCheckpointInterval = 45 * time.Minute var ( - uploadLog = logging.Module("uploader") - repoFSLog = logging.Module("repofs") - + uploadLog = logging.Module("uploader") uploadTracer = otel.Tracer("upload") ) @@ -64,7 +64,7 @@ type Uploader struct { totalWrittenBytes atomic.Int64 - Progress UploadProgress + Progress Progress // automatically cancel the Upload after certain number of bytes MaxUploadBytes int64 @@ -509,7 +509,7 @@ func (u *Uploader) uploadFileWithCheckpointing(ctx context.Context, relativePath // checkpointRoot invokes checkpoints on the provided registry and if a checkpoint entry was generated, // saves it in an incomplete snapshot manifest. func (u *Uploader) checkpointRoot(ctx context.Context, cp *checkpointRegistry, prototypeManifest *snapshot.Manifest) error { - var dmbCheckpoint DirManifestBuilder + var dmbCheckpoint snapshotfs.DirManifestBuilder if err := cp.runCheckpoints(&dmbCheckpoint); err != nil { return errors.Wrap(err, "running checkpointers") } @@ -586,7 +586,7 @@ func (u *Uploader) periodicallyCheckpoint(ctx context.Context, cp *checkpointReg // uploadDirWithCheckpointing uploads the specified Directory to the repository. func (u *Uploader) uploadDirWithCheckpointing(ctx context.Context, rootDir fs.Directory, policyTree *policy.Tree, previousDirs []fs.Directory, sourceInfo snapshot.SourceInfo) (*snapshot.DirEntry, error) { var ( - dmb DirManifestBuilder + dmb snapshotfs.DirManifestBuilder cp checkpointRegistry ) @@ -654,14 +654,10 @@ func rootCauseError(err error) error { return err } -func isDir(e *snapshot.DirEntry) bool { - return e.Type == snapshot.EntryTypeDirectory -} - func (u *Uploader) processChildren( ctx context.Context, parentDirCheckpointRegistry *checkpointRegistry, - parentDirBuilder *DirManifestBuilder, + parentDirBuilder *snapshotfs.DirManifestBuilder, localDirPathOrEmpty, relativePath string, dir fs.Directory, policyTree *policy.Tree, @@ -786,7 +782,7 @@ func (u *Uploader) effectiveParallelFileReads(pol *policy.Policy) int { func (u *Uploader) processDirectoryEntries( ctx context.Context, parentCheckpointRegistry *checkpointRegistry, - parentDirBuilder *DirManifestBuilder, + parentDirBuilder *snapshotfs.DirManifestBuilder, localDirPathOrEmpty string, dirRelativePath string, dir fs.Directory, @@ -837,7 +833,7 @@ func (u *Uploader) processSingle( ctx context.Context, entry fs.Entry, entryRelativePath string, - parentDirBuilder *DirManifestBuilder, + parentDirBuilder *snapshotfs.DirManifestBuilder, policyTree *policy.Tree, prevDirs []fs.Directory, localDirPathOrEmpty string, @@ -872,7 +868,7 @@ func (u *Uploader) processSingle( switch entry := entry.(type) { case fs.Directory: - childDirBuilder := &DirManifestBuilder{} + childDirBuilder := &snapshotfs.DirManifestBuilder{} childLocalDirPathOrEmpty := "" if localDirPathOrEmpty != "" { @@ -958,7 +954,17 @@ func (u *Uploader) processSingle( } //nolint:unparam -func (u *Uploader) processEntryUploadResult(ctx context.Context, de *snapshot.DirEntry, err error, entryRelativePath string, parentDirBuilder *DirManifestBuilder, isIgnored bool, logDetail policy.LogDetail, logMessage string, t0 timetrack.Timer) error { +func (u *Uploader) processEntryUploadResult( + ctx context.Context, + de *snapshot.DirEntry, + err error, + entryRelativePath string, + parentDirBuilder *snapshotfs.DirManifestBuilder, + isIgnored bool, + logDetail policy.LogDetail, + logMessage string, + t0 timetrack.Timer, +) error { if err != nil { u.reportErrorAndMaybeCancel(err, isIgnored, parentDirBuilder, entryRelativePath) } else { @@ -1098,7 +1104,7 @@ func uploadDirInternal( policyTree *policy.Tree, previousDirs []fs.Directory, localDirPathOrEmpty, dirRelativePath string, - thisDirBuilder *DirManifestBuilder, + thisDirBuilder *snapshotfs.DirManifestBuilder, thisCheckpointRegistry *checkpointRegistry, ) (resultDE *snapshot.DirEntry, resultErr error) { atomic.AddInt32(&u.stats.TotalDirectoryCount, 1) @@ -1162,7 +1168,7 @@ func uploadDirInternal( checkpointManifest := thisCheckpointBuilder.Build(fs.UTCTimestampFromTime(directory.ModTime()), IncompleteReasonCheckpoint) - oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, checkpointManifest, metadataComp) + oid, err := snapshotfs.WriteDirManifest(ctx, u.repo, dirRelativePath, checkpointManifest, metadataComp) if err != nil { return nil, errors.Wrap(err, "error writing dir manifest") } @@ -1177,7 +1183,7 @@ func uploadDirInternal( dirManifest := thisDirBuilder.Build(fs.UTCTimestampFromTime(directory.ModTime()), u.incompleteReason()) - oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, dirManifest, metadataComp) + oid, err := snapshotfs.WriteDirManifest(ctx, u.repo, dirRelativePath, dirManifest, metadataComp) if err != nil { return nil, errors.Wrapf(err, "error writing dir manifest: %v", directory.Name()) } @@ -1185,7 +1191,7 @@ func uploadDirInternal( return newDirEntryWithSummary(directory, oid, dirManifest.Summary) } -func (u *Uploader) reportErrorAndMaybeCancel(err error, isIgnored bool, dmb *DirManifestBuilder, entryRelativePath string) { +func (u *Uploader) reportErrorAndMaybeCancel(err error, isIgnored bool, dmb *snapshotfs.DirManifestBuilder, entryRelativePath string) { if u.IsCanceled() && errors.Is(err, errCanceled) { // already canceled, do not report another. return @@ -1227,7 +1233,7 @@ func (u *Uploader) maybeOpenDirectoryFromManifest(ctx context.Context, man *snap return nil } - ent := EntryFromDirEntry(u.repo, man.RootEntry) + ent := snapshotfs.EntryFromDirEntry(u.repo, man.RootEntry) dir, ok := ent.(fs.Directory) if !ok { diff --git a/snapshot/snapshotfs/upload_actions.go b/snapshot/upload/upload_actions.go similarity index 99% rename from snapshot/snapshotfs/upload_actions.go rename to snapshot/upload/upload_actions.go index 957245bbb..f8fdd0346 100644 --- a/snapshot/snapshotfs/upload_actions.go +++ b/snapshot/upload/upload_actions.go @@ -1,4 +1,4 @@ -package snapshotfs +package upload import ( "bufio" diff --git a/snapshot/snapshotfs/upload_estimator.go b/snapshot/upload/upload_estimator.go similarity index 99% rename from snapshot/snapshotfs/upload_estimator.go rename to snapshot/upload/upload_estimator.go index ee3f32647..f3b7c9e8c 100644 --- a/snapshot/snapshotfs/upload_estimator.go +++ b/snapshot/upload/upload_estimator.go @@ -1,4 +1,4 @@ -package snapshotfs +package upload import ( "context" diff --git a/snapshot/snapshotfs/upload_estimator_test.go b/snapshot/upload/upload_estimator_test.go similarity index 80% rename from snapshot/snapshotfs/upload_estimator_test.go rename to snapshot/upload/upload_estimator_test.go index 083b64839..737af4bb0 100644 --- a/snapshot/snapshotfs/upload_estimator_test.go +++ b/snapshot/upload/upload_estimator_test.go @@ -1,4 +1,4 @@ -package snapshotfs_test +package upload_test import ( "context" @@ -10,7 +10,7 @@ vsi "github.com/kopia/kopia/internal/volumesizeinfo" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/snapshot/policy" - "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -56,15 +56,15 @@ func getMockLogger() logging.Logger { // withFailedVolumeSizeInfo returns EstimatorOption which ensures that GetVolumeSizeInfo will fail with provided error. // Purposed for tests. -func withFailedVolumeSizeInfo(err error) snapshotfs.EstimatorOption { - return snapshotfs.WithVolumeSizeInfoFn(func(_ string) (vsi.VolumeSizeInfo, error) { +func withFailedVolumeSizeInfo(err error) upload.EstimatorOption { + return upload.WithVolumeSizeInfoFn(func(_ string) (vsi.VolumeSizeInfo, error) { return vsi.VolumeSizeInfo{}, err }) } // withVolumeSizeInfo returns EstimatorOption which provides fake volume size. -func withVolumeSizeInfo(filesCount, usedFileSize, totalFileSize uint64) snapshotfs.EstimatorOption { - return snapshotfs.WithVolumeSizeInfoFn(func(_ string) (vsi.VolumeSizeInfo, error) { +func withVolumeSizeInfo(filesCount, usedFileSize, totalFileSize uint64) upload.EstimatorOption { + return upload.WithVolumeSizeInfoFn(func(_ string) (vsi.VolumeSizeInfo, error) { return vsi.VolumeSizeInfo{ TotalSize: totalFileSize, UsedSize: usedFileSize, @@ -76,7 +76,7 @@ func withVolumeSizeInfo(filesCount, usedFileSize, totalFileSize uint64) snapshot func expectSuccessfulEstimation( ctx context.Context, t *testing.T, - estimator snapshotfs.Estimator, + estimator upload.Estimator, expectedNumberOfFiles, expectedDataSize int64, ) { @@ -121,7 +121,7 @@ func TestUploadEstimator(t *testing.T) { logger := getMockLogger() policyTree := policy.BuildTree(nil, policy.DefaultPolicy) - estimator := snapshotfs.NewEstimator(dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeClassic}, logger) + estimator := upload.NewEstimator(dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeClassic}, logger) estimationCtx := context.Background() expectSuccessfulEstimation(estimationCtx, t, estimator, expectedNumberOfFiles, expectedDataSize) @@ -133,8 +133,8 @@ func TestUploadEstimator(t *testing.T) { expectedDataSize := int64(2000) policyTree := policy.BuildTree(nil, policy.DefaultPolicy) - estimator := snapshotfs.NewEstimator( - dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeRough}, logger, + estimator := upload.NewEstimator( + dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeRough}, logger, withVolumeSizeInfo(uint64(expectedNumberOfFiles), uint64(expectedDataSize), 3000)) estimationCtx := context.Background() @@ -145,8 +145,8 @@ func TestUploadEstimator(t *testing.T) { logger := getMockLogger() policyTree := policy.BuildTree(nil, policy.DefaultPolicy) - estimator := snapshotfs.NewEstimator( - dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeRough}, logger, + estimator := upload.NewEstimator( + dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeRough}, logger, withFailedVolumeSizeInfo(errSimulated)) estimationCtx := context.Background() @@ -162,9 +162,9 @@ func TestUploadEstimator(t *testing.T) { expectedDataSize := int64(2000) policyTree := policy.BuildTree(nil, policy.DefaultPolicy) - estimator := snapshotfs.NewEstimator( + estimator := upload.NewEstimator( dir1, policyTree, - snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeAdaptive, AdaptiveThreshold: 100}, logger, + upload.EstimationParameters{Type: upload.EstimationTypeAdaptive, AdaptiveThreshold: 100}, logger, withVolumeSizeInfo(uint64(expectedNumberOfFiles), uint64(expectedDataSize), 3000)) estimationCtx := context.Background() @@ -175,9 +175,9 @@ func TestUploadEstimator(t *testing.T) { logger := getMockLogger() policyTree := policy.BuildTree(nil, policy.DefaultPolicy) - estimator := snapshotfs.NewEstimator( + estimator := upload.NewEstimator( dir1, policyTree, - snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeAdaptive, AdaptiveThreshold: 10000}, logger, + upload.EstimationParameters{Type: upload.EstimationTypeAdaptive, AdaptiveThreshold: 10000}, logger, withVolumeSizeInfo(uint64(1000), uint64(2000), 3000)) estimationCtx := context.Background() @@ -188,8 +188,8 @@ func TestUploadEstimator(t *testing.T) { logger := getMockLogger() policyTree := policy.BuildTree(nil, policy.DefaultPolicy) - estimator := snapshotfs.NewEstimator( - dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeAdaptive, AdaptiveThreshold: 1}, logger, + estimator := upload.NewEstimator( + dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeAdaptive, AdaptiveThreshold: 1}, logger, withFailedVolumeSizeInfo(errSimulated)) estimationCtx := context.Background() @@ -214,7 +214,7 @@ func TestUploadEstimator(t *testing.T) { logger := getMockLogger() policyTree := policy.BuildTree(nil, policy.DefaultPolicy) - estimator := snapshotfs.NewEstimator(dir2, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeRough}, logger) + estimator := upload.NewEstimator(dir2, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeRough}, logger) // In case of canceled context, we should get zeroes instead of estimated numbers expectSuccessfulEstimation(testCtx, t, estimator, 0, 0) @@ -229,7 +229,7 @@ func TestUploadEstimator(t *testing.T) { logger := getMockLogger() policyTree := policy.BuildTree(nil, policy.DefaultPolicy) - estimator := snapshotfs.NewEstimator(dir2, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeClassic}, logger) + estimator := upload.NewEstimator(dir2, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeClassic}, logger) dir2.Subdir("d1").OnReaddir(func() { estimator.Cancel() @@ -248,7 +248,7 @@ func TestUploadEstimator(t *testing.T) { }, policy.DefaultPolicy) logger := getMockLogger() - estimator := snapshotfs.NewEstimator(dir1, policyTree, snapshotfs.EstimationParameters{Type: snapshotfs.EstimationTypeClassic}, logger) + estimator := upload.NewEstimator(dir1, policyTree, upload.EstimationParameters{Type: upload.EstimationTypeClassic}, logger) expectSuccessfulEstimation(context.Background(), t, estimator, expectedNumberOfFiles-1, expectedDataSize-int64(len(file1Content))) }) diff --git a/snapshot/snapshotfs/upload_os_snapshot_nonwindows.go b/snapshot/upload/upload_os_snapshot_nonwindows.go similarity index 96% rename from snapshot/snapshotfs/upload_os_snapshot_nonwindows.go rename to snapshot/upload/upload_os_snapshot_nonwindows.go index e0039e12d..0216762c7 100644 --- a/snapshot/snapshotfs/upload_os_snapshot_nonwindows.go +++ b/snapshot/upload/upload_os_snapshot_nonwindows.go @@ -1,7 +1,7 @@ //go:build !windows // +build !windows -package snapshotfs +package upload import ( "context" diff --git a/snapshot/snapshotfs/upload_os_snapshot_windows.go b/snapshot/upload/upload_os_snapshot_windows.go similarity index 99% rename from snapshot/snapshotfs/upload_os_snapshot_windows.go rename to snapshot/upload/upload_os_snapshot_windows.go index d9f11f39a..14a62ebd9 100644 --- a/snapshot/snapshotfs/upload_os_snapshot_windows.go +++ b/snapshot/upload/upload_os_snapshot_windows.go @@ -1,4 +1,4 @@ -package snapshotfs +package upload import ( "context" diff --git a/snapshot/snapshotfs/upload_progress.go b/snapshot/upload/upload_progress.go similarity index 95% rename from snapshot/snapshotfs/upload_progress.go rename to snapshot/upload/upload_progress.go index 4f690f03c..03561d0a5 100644 --- a/snapshot/snapshotfs/upload_progress.go +++ b/snapshot/upload/upload_progress.go @@ -1,4 +1,4 @@ -package snapshotfs +package upload import ( "sync" @@ -26,10 +26,10 @@ type EstimationParameters struct { AdaptiveThreshold int64 } -// UploadProgress is invoked by uploader to report status of file and directory uploads. +// Progress is invoked by uploader to report status of file and directory uploads. // //nolint:interfacebloat -type UploadProgress interface { +type Progress interface { // Enabled returns true when progress is enabled, false otherwise. Enabled() bool @@ -163,10 +163,10 @@ func (p *NullUploadProgress) EstimationParameters() EstimationParameters { } } -var _ UploadProgress = (*NullUploadProgress)(nil) +var _ Progress = (*NullUploadProgress)(nil) -// UploadCounters represents a snapshot of upload counters. -type UploadCounters struct { +// Counters represents a snapshot of upload counters. +type Counters struct { // +checkatomic TotalCachedBytes int64 `json:"cachedBytes"` // +checkatomic @@ -206,13 +206,13 @@ type CountingUploadProgress struct { mu sync.Mutex - counters UploadCounters + counters Counters } // UploadStarted implements UploadProgress. func (p *CountingUploadProgress) UploadStarted() { // reset counters to all-zero values. - p.counters = UploadCounters{} + p.counters = Counters{} } // UploadedBytes implements UploadProgress. @@ -289,11 +289,11 @@ func (p *CountingUploadProgress) StartedDirectory(dirname string) { } // Snapshot captures current snapshot of the upload. -func (p *CountingUploadProgress) Snapshot() UploadCounters { +func (p *CountingUploadProgress) Snapshot() Counters { p.mu.Lock() defer p.mu.Unlock() - return UploadCounters{ + return Counters{ TotalCachedFiles: atomic.LoadInt32(&p.counters.TotalCachedFiles), TotalHashedFiles: atomic.LoadInt32(&p.counters.TotalHashedFiles), TotalCachedBytes: atomic.LoadInt64(&p.counters.TotalCachedBytes), @@ -342,4 +342,4 @@ func (p *CountingUploadProgress) UITaskCounters(final bool) map[string]uitask.Co return m } -var _ UploadProgress = (*CountingUploadProgress)(nil) +var _ Progress = (*CountingUploadProgress)(nil) diff --git a/snapshot/snapshotfs/upload_scan.go b/snapshot/upload/upload_scan.go similarity index 97% rename from snapshot/snapshotfs/upload_scan.go rename to snapshot/upload/upload_scan.go index 9874d656f..8b4e39860 100644 --- a/snapshot/snapshotfs/upload_scan.go +++ b/snapshot/upload/upload_scan.go @@ -1,4 +1,4 @@ -package snapshotfs +package upload import ( "context" diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/upload/upload_test.go similarity index 99% rename from snapshot/snapshotfs/upload_test.go rename to snapshot/upload/upload_test.go index 588e53a06..0b3871a95 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/upload/upload_test.go @@ -1,4 +1,4 @@ -package snapshotfs +package upload import ( "bytes" @@ -44,6 +44,7 @@ "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" + "github.com/kopia/kopia/snapshot/snapshotfs" ) const ( @@ -284,7 +285,7 @@ func TestUploadMetadataCompression(t *testing.T) { t.Errorf("Upload error: %v", err) } - dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) entries := findAllEntries(t, ctx, dir) verifyMetadataCompressor(t, ctx, th.repo, entries, compression.HeaderZstdFastest) }) @@ -305,7 +306,7 @@ func TestUploadMetadataCompression(t *testing.T) { t.Errorf("Upload error: %v", err) } - dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) entries := findAllEntries(t, ctx, dir) verifyMetadataCompressor(t, ctx, th.repo, entries, content.NoCompression) }) @@ -326,7 +327,7 @@ func TestUploadMetadataCompression(t *testing.T) { t.Errorf("Upload error: %v", err) } - dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) entries := findAllEntries(t, ctx, dir) verifyMetadataCompressor(t, ctx, th.repo, entries, compression.ByName["gzip"].HeaderID()) }) @@ -628,12 +629,12 @@ func TestUpload_SubDirectoryReadFailureSomeIgnoredNoFailFast(t *testing.T) { } type mockProgress struct { - UploadProgress + Progress finishedFileCheck func(string, error) } func (mp *mockProgress) FinishedFile(relativePath string, err error) { - defer mp.UploadProgress.FinishedFile(relativePath, err) + defer mp.Progress.FinishedFile(relativePath, err) mp.finishedFileCheck(relativePath, err) } @@ -657,7 +658,7 @@ func TestUpload_FinishedFileProgress(t *testing.T) { u := NewUploader(th.repo) u.ForceHashPercentage = 0 u.Progress = &mockProgress{ - UploadProgress: u.Progress, + Progress: u.Progress, finishedFileCheck: func(relativePath string, err error) { defer func() { mu.Lock() @@ -1280,7 +1281,7 @@ func TestParallelUploadOfLargeFiles(t *testing.T) { t.Logf("man: %v", man.RootObjectID()) - dir := EntryFromDirEntry(th.repo, man.RootEntry).(fs.Directory) + dir := snapshotfs.EntryFromDirEntry(th.repo, man.RootEntry).(fs.Directory) successCount := 0 diff --git a/tests/tools/kopiaclient/kopiaclient.go b/tests/tools/kopiaclient/kopiaclient.go index d1bc6b902..184edcf05 100644 --- a/tests/tools/kopiaclient/kopiaclient.go +++ b/tests/tools/kopiaclient/kopiaclient.go @@ -26,6 +26,7 @@ "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/kopia/kopia/snapshot/upload" "github.com/kopia/kopia/tests/robustness" ) @@ -112,7 +113,7 @@ func (kc *KopiaClient) SnapshotCreate(ctx context.Context, key string, val []byt } source := kc.getSourceForKeyVal(key, val) - u := snapshotfs.NewUploader(rw) + u := upload.NewUploader(rw) man, err := u.Upload(ctx, source, policyTree, si) if err != nil {