diff --git a/cli/command_manifest_ls.go b/cli/command_manifest_ls.go index 7575928f3..d84ce3e06 100644 --- a/cli/command_manifest_ls.go +++ b/cli/command_manifest_ls.go @@ -31,7 +31,10 @@ func listManifestItems(ctx context.Context, rep *repo.Repository) error { filter[kv[0:p]] = kv[p+1:] } - items := rep.Manifests.Find(filter) + items, err := rep.Manifests.Find(ctx, filter) + if err != nil { + return err + } sort.Slice(items, func(i, j int) bool { for _, key := range *manifestListSort { diff --git a/cli/command_manifest_show.go b/cli/command_manifest_show.go index ef0062e4f..7aa62ffc5 100644 --- a/cli/command_manifest_show.go +++ b/cli/command_manifest_show.go @@ -19,12 +19,12 @@ func init() { func showManifestItems(ctx context.Context, rep *repo.Repository) error { for _, it := range *manifestShowItems { - md, err := rep.Manifests.GetMetadata(it) + md, err := rep.Manifests.GetMetadata(ctx, it) if err != nil { return fmt.Errorf("error getting metadata for %q: %v", it, err) } - b, err := rep.Manifests.GetRaw(it) + b, err := rep.Manifests.GetRaw(ctx, it) if err != nil { return fmt.Errorf("error showing %q: %v", it, err) } diff --git a/cli/command_object_verify.go b/cli/command_object_verify.go index 61fb0ab0c..c9a8f231b 100644 --- a/cli/command_object_verify.go +++ b/cli/command_object_verify.go @@ -200,7 +200,7 @@ func runVerifyCommand(ctx context.Context, rep *repo.Repository) error { } func enqueueRootsToVerify(ctx context.Context, v *verifier, rep *repo.Repository) error { - manifests, err := loadSourceManifests(rep, *verifyCommandAllSources, *verifyCommandSources) + manifests, err := loadSourceManifests(ctx, rep, *verifyCommandAllSources, *verifyCommandSources) if err != nil { return err } @@ -239,20 +239,28 @@ func enqueueRootsToVerify(ctx context.Context, v *verifier, rep *repo.Repository return nil } -func loadSourceManifests(rep *repo.Repository, all bool, sources []string) ([]*snapshot.Manifest, error) { +func loadSourceManifests(ctx context.Context, rep *repo.Repository, all bool, sources []string) ([]*snapshot.Manifest, error) { var manifestIDs []string if *verifyCommandAllSources { - manifestIDs = append(manifestIDs, snapshot.ListSnapshotManifests(rep, nil)...) + man, err := snapshot.ListSnapshotManifests(ctx, rep, nil) + if err != nil { + return nil, err + } + manifestIDs = append(manifestIDs, man...) } else { for _, srcStr := range *verifyCommandSources { src, err := snapshot.ParseSourceInfo(srcStr, getHostName(), getUserName()) if err != nil { return nil, fmt.Errorf("error parsing %q: %v", srcStr, err) } - manifestIDs = append(manifestIDs, snapshot.ListSnapshotManifests(rep, &src)...) + man, err := snapshot.ListSnapshotManifests(ctx, rep, &src) + if err != nil { + return nil, err + } + manifestIDs = append(manifestIDs, man...) } } - return snapshot.LoadSnapshots(rep, manifestIDs) + return snapshot.LoadSnapshots(ctx, rep, manifestIDs) } func init() { diff --git a/cli/command_policy.go b/cli/command_policy.go index 803e554c4..1b2635438 100644 --- a/cli/command_policy.go +++ b/cli/command_policy.go @@ -1,6 +1,7 @@ package cli import ( + "context" "fmt" "github.com/kopia/kopia/policy" @@ -8,7 +9,7 @@ "github.com/kopia/kopia/snapshot" ) -func policyTargets(rep *repo.Repository, globalFlag *bool, targetsFlag *[]string) ([]snapshot.SourceInfo, error) { +func policyTargets(ctx context.Context, rep *repo.Repository, globalFlag *bool, targetsFlag *[]string) ([]snapshot.SourceInfo, error) { if *globalFlag == (len(*targetsFlag) > 0) { return nil, fmt.Errorf("must pass either '--global' or a list of path targets") } @@ -21,7 +22,7 @@ func policyTargets(rep *repo.Repository, globalFlag *bool, targetsFlag *[]string var res []snapshot.SourceInfo for _, ts := range *targetsFlag { - if t, err := policy.GetPolicyByID(rep, ts); err == nil { + if t, err := policy.GetPolicyByID(ctx, rep, ts); err == nil { res = append(res, t.Target()) continue } diff --git a/cli/command_policy_edit.go b/cli/command_policy_edit.go index 945231d31..851bf9a46 100644 --- a/cli/command_policy_edit.go +++ b/cli/command_policy_edit.go @@ -57,13 +57,13 @@ func init() { } func editPolicy(ctx context.Context, rep *repo.Repository) error { - targets, err := policyTargets(rep, policyEditGlobal, policyEditTargets) + targets, err := policyTargets(ctx, rep, policyEditGlobal, policyEditTargets) if err != nil { return err } for _, target := range targets { - original, err := policy.GetDefinedPolicy(rep, target) + original, err := policy.GetDefinedPolicy(ctx, rep, target) if err == policy.ErrPolicyNotFound { original = &policy.Policy{} } @@ -98,7 +98,7 @@ func editPolicy(ctx context.Context, rep *repo.Repository) error { fmt.Scanf("%v", &shouldSave) //nolint:errcheck if strings.HasPrefix(strings.ToLower(shouldSave), "y") { - if err := policy.SetPolicy(rep, target, updated); err != nil { + if err := policy.SetPolicy(ctx, rep, target, updated); err != nil { return fmt.Errorf("can't save policy for %v: %v", target, err) } } diff --git a/cli/command_policy_ls.go b/cli/command_policy_ls.go index 17842ae67..761cd7ca4 100644 --- a/cli/command_policy_ls.go +++ b/cli/command_policy_ls.go @@ -18,7 +18,7 @@ func init() { } func listPolicies(ctx context.Context, rep *repo.Repository) error { - policies, err := policy.ListPolicies(rep) + policies, err := policy.ListPolicies(ctx, rep) if err != nil { return err } diff --git a/cli/command_policy_remove.go b/cli/command_policy_remove.go index 830881ab1..1bdd36352 100644 --- a/cli/command_policy_remove.go +++ b/cli/command_policy_remove.go @@ -18,14 +18,14 @@ func init() { } func removePolicy(ctx context.Context, rep *repo.Repository) error { - targets, err := policyTargets(rep, policyRemoveGlobal, policyRemoveTargets) + targets, err := policyTargets(ctx, rep, policyRemoveGlobal, policyRemoveTargets) if err != nil { return err } for _, target := range targets { log.Infof("Removing policy on %q...", target) - if err := policy.RemovePolicy(rep, target); err != nil { + if err := policy.RemovePolicy(ctx, rep, target); err != nil { return err } } diff --git a/cli/command_policy_set.go b/cli/command_policy_set.go index ecd900c9a..a14bf928c 100644 --- a/cli/command_policy_set.go +++ b/cli/command_policy_set.go @@ -54,13 +54,13 @@ func init() { } func setPolicy(ctx context.Context, rep *repo.Repository) error { - targets, err := policyTargets(rep, policySetGlobal, policySetTargets) + targets, err := policyTargets(ctx, rep, policySetGlobal, policySetTargets) if err != nil { return err } for _, target := range targets { - p, err := policy.GetDefinedPolicy(rep, target) + p, err := policy.GetDefinedPolicy(ctx, rep, target) if err == policy.ErrPolicyNotFound { p = &policy.Policy{} } @@ -76,7 +76,7 @@ func setPolicy(ctx context.Context, rep *repo.Repository) error { return fmt.Errorf("no changes specified") } - if err := policy.SetPolicy(rep, target, p); err != nil { + if err := policy.SetPolicy(ctx, rep, target, p); err != nil { return fmt.Errorf("can't save policy for %v: %v", target, err) } } diff --git a/cli/command_policy_show.go b/cli/command_policy_show.go index abf83146f..b49b83a69 100644 --- a/cli/command_policy_show.go +++ b/cli/command_policy_show.go @@ -21,13 +21,13 @@ func init() { } func showPolicy(ctx context.Context, rep *repo.Repository) error { - targets, err := policyTargets(rep, policyShowGlobal, policyShowTargets) + targets, err := policyTargets(ctx, rep, policyShowGlobal, policyShowTargets) if err != nil { return err } for _, target := range targets { - effective, policies, err := policy.GetEffectivePolicy(rep, target) + effective, policies, err := policy.GetEffectivePolicy(ctx, rep, target) if err != nil { return fmt.Errorf("can't get effective policy for %q: %v", target, err) } diff --git a/cli/command_repository_migrate.go b/cli/command_repository_migrate.go index f7981d468..4b3341927 100644 --- a/cli/command_repository_migrate.go +++ b/cli/command_repository_migrate.go @@ -32,7 +32,7 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error { return fmt.Errorf("can't open source repository: %v", err) } - sources, err := getSourcesToMigrate(sourceRepo) + sources, err := getSourcesToMigrate(ctx, sourceRepo) if err != nil { return fmt.Errorf("can't retrieve sources: %v", err) } @@ -68,8 +68,11 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error { func migrateSingleSource(ctx context.Context, uploader *upload.Uploader, sourceRepo, destRepo *repo.Repository, s snapshot.SourceInfo) error { log.Debugf("migrating source %v", s) - manifests := snapshot.ListSnapshotManifests(sourceRepo, &s) - snapshots, err := snapshot.LoadSnapshots(sourceRepo, manifests) + manifests, err := snapshot.ListSnapshotManifests(ctx, sourceRepo, &s) + if err != nil { + return err + } + snapshots, err := snapshot.LoadSnapshots(ctx, sourceRepo, manifests) if err != nil { return fmt.Errorf("unable to load snapshot manifests for %v: %v", s, err) } @@ -86,7 +89,7 @@ func migrateSingleSource(ctx context.Context, uploader *upload.Uploader, sourceR m.Stats = newm.Stats m.IncompleteReason = newm.IncompleteReason - if _, err := snapshot.SaveSnapshot(destRepo, m); err != nil { + if _, err := snapshot.SaveSnapshot(ctx, destRepo, m); err != nil { return fmt.Errorf("cannot save manifest: %v", err) } } @@ -101,7 +104,7 @@ func filterSnapshotsToMigrate(s []*snapshot.Manifest) []*snapshot.Manifest { return s } -func getSourcesToMigrate(rep *repo.Repository) ([]snapshot.SourceInfo, error) { +func getSourcesToMigrate(ctx context.Context, rep *repo.Repository) ([]snapshot.SourceInfo, error) { if len(*migrateSources) > 0 { var result []snapshot.SourceInfo @@ -118,7 +121,7 @@ func getSourcesToMigrate(rep *repo.Repository) ([]snapshot.SourceInfo, error) { } if *migrateAll { - return snapshot.ListSources(rep), nil + return snapshot.ListSources(ctx, rep) } return nil, nil diff --git a/cli/command_snapshot_create.go b/cli/command_snapshot_create.go index 6eaac99aa..886fda8d6 100644 --- a/cli/command_snapshot_create.go +++ b/cli/command_snapshot_create.go @@ -37,7 +37,7 @@ func runBackupCommand(ctx context.Context, rep *repo.Repository) error { sources := *snapshotCreateSources if *snapshotCreateAll { - local, err := getLocalBackupPaths(rep) + local, err := getLocalBackupPaths(ctx, rep) if err != nil { return err } @@ -90,12 +90,12 @@ func snapshotSingleSource(ctx context.Context, rep *repo.Repository, u *upload.U localEntry := mustGetLocalFSEntry(sourceInfo.Path) - previousManifest, err := findPreviousSnapshotManifest(rep, sourceInfo) + previousManifest, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo) if err != nil { return err } - u.FilesPolicy, err = policy.FilesPolicyGetter(rep, sourceInfo) + u.FilesPolicy, err = policy.FilesPolicyGetter(ctx, rep, sourceInfo) if err != nil { return err } @@ -108,7 +108,7 @@ func snapshotSingleSource(ctx context.Context, rep *repo.Repository, u *upload.U manifest.Description = *snapshotCreateDescription - snapID, err := snapshot.SaveSnapshot(rep, manifest) + snapID, err := snapshot.SaveSnapshot(ctx, rep, manifest) if err != nil { return fmt.Errorf("cannot save manifest: %v", err) } @@ -122,8 +122,8 @@ func snapshotSingleSource(ctx context.Context, rep *repo.Repository, u *upload.U return nil } -func findPreviousSnapshotManifest(rep *repo.Repository, sourceInfo snapshot.SourceInfo) (*snapshot.Manifest, error) { - previous, err := snapshot.ListSnapshots(rep, sourceInfo) +func findPreviousSnapshotManifest(ctx context.Context, rep *repo.Repository, sourceInfo snapshot.SourceInfo) (*snapshot.Manifest, error) { + previous, err := snapshot.ListSnapshots(ctx, rep, sourceInfo) if err != nil { return nil, fmt.Errorf("error listing previous backups: %v", err) } @@ -144,12 +144,15 @@ func findPreviousSnapshotManifest(rep *repo.Repository, sourceInfo snapshot.Sour return previousManifest, nil } -func getLocalBackupPaths(rep *repo.Repository) ([]string, error) { +func getLocalBackupPaths(ctx context.Context, rep *repo.Repository) ([]string, error) { h := getHostName() u := getUserName() log.Debugf("Looking for previous backups of '%v@%v'...", u, h) - sources := snapshot.ListSources(rep) + sources, err := snapshot.ListSources(ctx, rep) + if err != nil { + return nil, fmt.Errorf("unable to list sources: %v", err) + } var result []string diff --git a/cli/command_snapshot_estimate.go b/cli/command_snapshot_estimate.go index e60433abe..bd54d4802 100644 --- a/cli/command_snapshot_estimate.go +++ b/cli/command_snapshot_estimate.go @@ -86,7 +86,7 @@ func runSnapshotEstimateCommand(ctx context.Context, rep *repo.Repository) error entry := mustGetLocalFSEntry(path) if dir, ok := entry.(fs.Directory); ok { - ignorePolicy, err := policy.FilesPolicyGetter(rep, sourceInfo) + ignorePolicy, err := policy.FilesPolicyGetter(ctx, rep, sourceInfo) if err != nil { return err } diff --git a/cli/command_snapshot_expire.go b/cli/command_snapshot_expire.go index 1cc66b103..16c45f52c 100644 --- a/cli/command_snapshot_expire.go +++ b/cli/command_snapshot_expire.go @@ -19,14 +19,14 @@ snapshotExpireDelete = snapshotExpireCommand.Flag("delete", "Whether to actually delete snapshots").Default("no").String() ) -func getSnapshotNamesToExpire(rep *repo.Repository) ([]string, error) { +func getSnapshotNamesToExpire(ctx context.Context, rep *repo.Repository) ([]string, error) { if !*snapshotExpireAll && len(*snapshotExpirePaths) == 0 { return nil, fmt.Errorf("Must specify paths to expire or --all") } if *snapshotExpireAll { printStderr("Scanning all active snapshots...\n") - return snapshot.ListSnapshotManifests(rep, nil), nil + return snapshot.ListSnapshotManifests(ctx, rep, nil) } var result []string @@ -39,7 +39,7 @@ func getSnapshotNamesToExpire(rep *repo.Repository) ([]string, error) { log.Debugf("Looking for snapshots of %v", src) - matches := snapshot.ListSnapshotManifests(rep, &src) + matches, err := snapshot.ListSnapshotManifests(ctx, rep, &src) if err != nil { return nil, fmt.Errorf("error listing snapshots for %v: %v", src, err) } @@ -53,17 +53,17 @@ func getSnapshotNamesToExpire(rep *repo.Repository) ([]string, error) { } func runExpireCommand(ctx context.Context, rep *repo.Repository) error { - snapshotNames, err := getSnapshotNamesToExpire(rep) + snapshotNames, err := getSnapshotNamesToExpire(ctx, rep) if err != nil { return err } - snapshots, err := snapshot.LoadSnapshots(rep, snapshotNames) + snapshots, err := snapshot.LoadSnapshots(ctx, rep, snapshotNames) if err != nil { return err } snapshots = filterHostAndUser(snapshots) - toDelete, err := policy.GetExpiredSnapshots(rep, snapshots) + toDelete, err := policy.GetExpiredSnapshots(ctx, rep, snapshots) if err != nil { return err } diff --git a/cli/command_snapshot_list.go b/cli/command_snapshot_list.go index 390e71c7e..b149c0020 100644 --- a/cli/command_snapshot_list.go +++ b/cli/command_snapshot_list.go @@ -29,9 +29,12 @@ maxResultsPerPath = snapshotListCommand.Flag("max-results", "Maximum number of results.").Default("1000").Int() ) -func findSnapshotsForSource(rep *repo.Repository, sourceInfo snapshot.SourceInfo) (manifestIDs []string, relPath string, err error) { +func findSnapshotsForSource(ctx context.Context, rep *repo.Repository, sourceInfo snapshot.SourceInfo) (manifestIDs []string, relPath string, err error) { for len(sourceInfo.Path) > 0 { - list := snapshot.ListSnapshotManifests(rep, &sourceInfo) + list, err := snapshot.ListSnapshotManifests(ctx, rep, &sourceInfo) + if err != nil { + return nil, "", err + } if len(list) > 0 { return list, relPath, nil @@ -55,9 +58,10 @@ func findSnapshotsForSource(rep *repo.Repository, sourceInfo snapshot.SourceInfo return nil, "", nil } -func findManifestIDs(rep *repo.Repository, source string) ([]string, string, error) { +func findManifestIDs(ctx context.Context, rep *repo.Repository, source string) ([]string, string, error) { if source == "" { - return snapshot.ListSnapshotManifests(rep, nil), "", nil + man, err := snapshot.ListSnapshotManifests(ctx, rep, nil) + return man, "", err } si, err := snapshot.ParseSourceInfo(source, getHostName(), getUserName()) @@ -65,7 +69,7 @@ func findManifestIDs(rep *repo.Repository, source string) ([]string, string, err return nil, "", fmt.Errorf("invalid directory: '%s': %s", source, err) } - manifestIDs, relPath, err := findSnapshotsForSource(rep, si) + manifestIDs, relPath, err := findSnapshotsForSource(ctx, rep, si) if relPath != "" { relPath = "/" + relPath } @@ -74,12 +78,12 @@ func findManifestIDs(rep *repo.Repository, source string) ([]string, string, err } func runSnapshotsCommand(ctx context.Context, rep *repo.Repository) error { - manifestIDs, relPath, err := findManifestIDs(rep, *snapshotListPath) + manifestIDs, relPath, err := findManifestIDs(ctx, rep, *snapshotListPath) if err != nil { return err } - manifests, err := snapshot.LoadSnapshots(rep, manifestIDs) + manifests, err := snapshot.LoadSnapshots(ctx, rep, manifestIDs) if err != nil { return err } @@ -94,7 +98,7 @@ func outputManifestGroups(ctx context.Context, rep *repo.Repository, manifests [ fmt.Printf("%v%v\n", separator, src) separator = "\n" - pol, _, err := policy.GetEffectivePolicy(rep, src) + pol, _, err := policy.GetEffectivePolicy(ctx, rep, src) if err != nil { log.Warningf("unable to determine effective policy for %v", src) } else { diff --git a/fs/repofs/all_sources.go b/fs/repofs/all_sources.go index c4d7cbde9..4ea387b5c 100644 --- a/fs/repofs/all_sources.go +++ b/fs/repofs/all_sources.go @@ -30,7 +30,10 @@ func (s *repositoryAllSources) Metadata() *fs.EntryMetadata { } func (s *repositoryAllSources) Readdir(ctx context.Context) (fs.Entries, error) { - srcs := snapshot.ListSources(s.rep) + srcs, err := snapshot.ListSources(ctx, s.rep) + if err != nil { + return nil, err + } users := map[string]bool{} for _, src := range srcs { diff --git a/fs/repofs/source_directories.go b/fs/repofs/source_directories.go index 03ec403d4..464ab259f 100644 --- a/fs/repofs/source_directories.go +++ b/fs/repofs/source_directories.go @@ -29,7 +29,10 @@ func (s *sourceDirectories) Summary() *fs.DirectorySummary { } func (s *sourceDirectories) Readdir(ctx context.Context) (fs.Entries, error) { - sources := snapshot.ListSources(s.rep) + sources, err := snapshot.ListSources(ctx, s.rep) + if err != nil { + return nil, err + } var result fs.Entries for _, src := range sources { diff --git a/fs/repofs/source_snapshots.go b/fs/repofs/source_snapshots.go index e0f47aad7..ee8b72abd 100644 --- a/fs/repofs/source_snapshots.go +++ b/fs/repofs/source_snapshots.go @@ -34,7 +34,7 @@ func (s *sourceSnapshots) Summary() *fs.DirectorySummary { } func (s *sourceSnapshots) Readdir(ctx context.Context) (fs.Entries, error) { - manifests, err := snapshot.ListSnapshots(s.rep, s.src) + manifests, err := snapshot.ListSnapshots(ctx, s.rep, s.src) if err != nil { return nil, err } diff --git a/internal/server/api_policy_list.go b/internal/server/api_policy_list.go index 06a1c1a22..1015af400 100644 --- a/internal/server/api_policy_list.go +++ b/internal/server/api_policy_list.go @@ -1,14 +1,15 @@ package server import ( + "context" "net/http" "github.com/kopia/kopia/internal/serverapi" "github.com/kopia/kopia/policy" ) -func (s *Server) handlePolicyList(r *http.Request) (interface{}, *apiError) { - policies, err := policy.ListPolicies(s.rep) +func (s *Server) handlePolicyList(ctx context.Context, r *http.Request) (interface{}, *apiError) { + policies, err := policy.ListPolicies(ctx, s.rep) if err != nil { return nil, internalServerError(err) } diff --git a/internal/server/api_snapshot_list.go b/internal/server/api_snapshot_list.go index f19e6c9f8..937bef72d 100644 --- a/internal/server/api_snapshot_list.go +++ b/internal/server/api_snapshot_list.go @@ -1,6 +1,7 @@ package server import ( + "context" "net/http" "net/url" "strings" @@ -27,9 +28,13 @@ type snapshotListResponse struct { Snapshots []*snapshotListEntry `json:"snapshots"` } -func (s *Server) handleSourceSnapshotList(r *http.Request) (interface{}, *apiError) { - manifestIDs := snapshot.ListSnapshotManifests(s.rep, nil) - manifests, err := snapshot.LoadSnapshots(s.rep, manifestIDs) +func (s *Server) handleSourceSnapshotList(ctx context.Context, r *http.Request) (interface{}, *apiError) { + manifestIDs, err := snapshot.ListSnapshotManifests(ctx, s.rep, nil) + if err != nil { + return nil, internalServerError(err) + } + + manifests, err := snapshot.LoadSnapshots(ctx, s.rep, manifestIDs) if err != nil { return nil, internalServerError(err) } @@ -44,7 +49,7 @@ func (s *Server) handleSourceSnapshotList(r *http.Request) (interface{}, *apiErr continue } - pol, _, err := policy.GetEffectivePolicy(s.rep, first.Source) + pol, _, err := policy.GetEffectivePolicy(ctx, s.rep, first.Source) if err == nil { pol.RetentionPolicy.ComputeRetentionReasons(grp) } diff --git a/internal/server/api_sources_list.go b/internal/server/api_sources_list.go index 4bc0a0e0b..e9b8d77fe 100644 --- a/internal/server/api_sources_list.go +++ b/internal/server/api_sources_list.go @@ -1,13 +1,14 @@ package server import ( + "context" "net/http" "sort" "github.com/kopia/kopia/internal/serverapi" ) -func (s *Server) handleSourcesList(r *http.Request) (interface{}, *apiError) { +func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interface{}, *apiError) { resp := &serverapi.SourcesResponse{ Sources: []serverapi.SourceStatus{}, } diff --git a/internal/server/api_status.go b/internal/server/api_status.go index a60f6d79e..02534e42e 100644 --- a/internal/server/api_status.go +++ b/internal/server/api_status.go @@ -1,12 +1,13 @@ package server import ( + "context" "net/http" "github.com/kopia/kopia/internal/serverapi" ) -func (s *Server) handleStatus(r *http.Request) (interface{}, *apiError) { +func (s *Server) handleStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) { bf := s.rep.Blocks.Format bf.HMACSecret = nil bf.MasterKey = nil diff --git a/internal/server/server.go b/internal/server/server.go index e9b3a54f7..5b5a368a6 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -3,6 +3,7 @@ import ( "context" "encoding/json" + "fmt" "net/http" "net/url" "sync" @@ -42,7 +43,7 @@ func (s *Server) APIHandlers() http.Handler { return p } -func (s *Server) handleAPI(f func(r *http.Request) (interface{}, *apiError)) http.Handler { +func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { s.mu.Lock() defer s.mu.Unlock() @@ -51,7 +52,7 @@ func (s *Server) handleAPI(f func(r *http.Request) (interface{}, *apiError)) htt e := json.NewEncoder(w) e.SetIndent("", " ") - v, err := f(r) + v, err := f(context.Background(), r) log.Debugf("returned %+v", v) if err == nil { if err := e.Encode(v); err != nil { @@ -64,12 +65,12 @@ func (s *Server) handleAPI(f func(r *http.Request) (interface{}, *apiError)) htt }) } -func (s *Server) handleRefresh(r *http.Request) (interface{}, *apiError) { +func (s *Server) handleRefresh(ctx context.Context, r *http.Request) (interface{}, *apiError) { log.Infof("refreshing") return &serverapi.Empty{}, nil } -func (s *Server) handleFlush(r *http.Request) (interface{}, *apiError) { +func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{}, *apiError) { log.Infof("flushing") return &serverapi.Empty{}, nil } @@ -89,19 +90,19 @@ func (s *Server) forAllSourceManagersMatchingURLFilter(c func(s *sourceManager) return resp, nil } -func (s *Server) handleUpload(r *http.Request) (interface{}, *apiError) { +func (s *Server) handleUpload(ctx context.Context, r *http.Request) (interface{}, *apiError) { return s.forAllSourceManagersMatchingURLFilter((*sourceManager).upload, r.URL.Query()) } -func (s *Server) handlePause(r *http.Request) (interface{}, *apiError) { +func (s *Server) handlePause(ctx context.Context, r *http.Request) (interface{}, *apiError) { return s.forAllSourceManagersMatchingURLFilter((*sourceManager).pause, r.URL.Query()) } -func (s *Server) handleResume(r *http.Request) (interface{}, *apiError) { +func (s *Server) handleResume(ctx context.Context, r *http.Request) (interface{}, *apiError) { return s.forAllSourceManagersMatchingURLFilter((*sourceManager).resume, r.URL.Query()) } -func (s *Server) handleCancel(r *http.Request) (interface{}, *apiError) { +func (s *Server) handleCancel(ctx context.Context, r *http.Request) (interface{}, *apiError) { return s.forAllSourceManagersMatchingURLFilter((*sourceManager).cancel, r.URL.Query()) } @@ -127,13 +128,18 @@ func New(ctx context.Context, rep *repo.Repository, hostname string, username st uploadSemaphore: make(chan struct{}, 1), } - for _, src := range snapshot.ListSources(rep) { + sources, err := snapshot.ListSources(ctx, rep) + if err != nil { + return nil, fmt.Errorf("unable to list sources: %v", err) + } + + for _, src := range sources { sm := newSourceManager(src, s) s.sourceManagers[src] = sm } for _, src := range s.sourceManagers { - go src.run() + go src.run(ctx) } return s, nil diff --git a/internal/server/source_manager.go b/internal/server/source_manager.go index df35478a8..a71321b50 100644 --- a/internal/server/source_manager.go +++ b/internal/server/source_manager.go @@ -64,19 +64,19 @@ func (s *sourceManager) setStatus(stat string) { s.mu.Unlock() } -func (s *sourceManager) run() { +func (s *sourceManager) run(ctx context.Context) { s.setStatus("INITIALIZING") defer s.setStatus("STOPPED") if s.server.hostname == s.src.Host { - s.runLocal() + s.runLocal(ctx) } else { - s.runRemote() + s.runRemote(ctx) } } -func (s *sourceManager) runLocal() { - s.refreshStatus() +func (s *sourceManager) runLocal(ctx context.Context) { + s.refreshStatus(ctx) for { var timeBeforeNextSnapshot time.Duration if !s.nextSnapshotTime.IsZero() { @@ -92,26 +92,26 @@ func (s *sourceManager) runLocal() { return case <-time.After(15 * time.Second): - s.refreshStatus() + s.refreshStatus(ctx) case <-time.After(timeBeforeNextSnapshot): log.Infof("snapshotting %v", s.src) s.setStatus("SNAPSHOTTING") - s.snapshot() - s.refreshStatus() + s.snapshot(ctx) + s.refreshStatus(ctx) } } } -func (s *sourceManager) runRemote() { - s.refreshStatus() +func (s *sourceManager) runRemote(ctx context.Context) { + s.refreshStatus(ctx) s.setStatus("REMOTE") for { select { case <-s.closed: return case <-time.After(15 * time.Second): - s.refreshStatus() + s.refreshStatus(ctx) } } } @@ -155,7 +155,7 @@ func (s *sourceManager) resume() serverapi.SourceActionResponse { return serverapi.SourceActionResponse{Success: true} } -func (s *sourceManager) snapshot() { +func (s *sourceManager) snapshot(ctx context.Context) { s.server.beginUpload(s.src) defer s.server.endUpload(s.src) @@ -165,13 +165,12 @@ func (s *sourceManager) snapshot() { return } u := upload.NewUploader(s.server.rep) - polGetter, err := policy.FilesPolicyGetter(s.server.rep, s.src) + polGetter, err := policy.FilesPolicyGetter(ctx, s.server.rep, s.src) if err != nil { log.Errorf("unable to create policy getter: %v", err) } u.FilesPolicy = polGetter u.Progress = s - ctx := context.Background() log.Infof("starting upload of %v", s.src) manifest, err := u.Upload(ctx, localEntry, s.src, s.lastSnapshot) @@ -180,7 +179,7 @@ func (s *sourceManager) snapshot() { return } - snapshotID, err := snapshot.SaveSnapshot(s.server.rep, manifest) + snapshotID, err := snapshot.SaveSnapshot(ctx, s.server.rep, manifest) if err != nil { log.Errorf("unable to save snapshot: %v", err) return @@ -220,16 +219,16 @@ func (s *sourceManager) findClosestNextSnapshotTime() time.Time { return nextSnapshotTime } -func (s *sourceManager) refreshStatus() { +func (s *sourceManager) refreshStatus(ctx context.Context) { log.Debugf("refreshing state for %v", s.src) - pol, _, err := policy.GetEffectivePolicy(s.server.rep, s.src) + pol, _, err := policy.GetEffectivePolicy(ctx, s.server.rep, s.src) if err != nil { s.setStatus("FAILED") return } s.pol = pol - snapshots, err := snapshot.ListSnapshots(s.server.rep, s.src) + snapshots, err := snapshot.ListSnapshots(ctx, s.server.rep, s.src) if err != nil { s.setStatus("FAILED") return diff --git a/policy/expire.go b/policy/expire.go index 4998d1f08..fb533da74 100644 --- a/policy/expire.go +++ b/policy/expire.go @@ -1,18 +1,18 @@ package policy import ( + "context" "strings" "github.com/kopia/kopia/repo" - "github.com/kopia/kopia/snapshot" ) // GetExpiredSnapshots computes the set of snapshot manifests that are not retained according to the policy. -func GetExpiredSnapshots(rep *repo.Repository, snapshots []*snapshot.Manifest) ([]*snapshot.Manifest, error) { +func GetExpiredSnapshots(ctx context.Context, rep *repo.Repository, snapshots []*snapshot.Manifest) ([]*snapshot.Manifest, error) { var toDelete []*snapshot.Manifest for _, snapshotGroup := range snapshot.GroupBySource(snapshots) { - td, err := getExpiredSnapshotsForSource(rep, snapshotGroup) + td, err := getExpiredSnapshotsForSource(ctx, rep, snapshotGroup) if err != nil { return nil, err } @@ -21,9 +21,9 @@ func GetExpiredSnapshots(rep *repo.Repository, snapshots []*snapshot.Manifest) ( return toDelete, nil } -func getExpiredSnapshotsForSource(rep *repo.Repository, snapshots []*snapshot.Manifest) ([]*snapshot.Manifest, error) { +func getExpiredSnapshotsForSource(ctx context.Context, rep *repo.Repository, snapshots []*snapshot.Manifest) ([]*snapshot.Manifest, error) { src := snapshots[0].Source - pol, _, err := GetEffectivePolicy(rep, src) + pol, _, err := GetEffectivePolicy(ctx, rep, src) if err != nil { return nil, err } diff --git a/policy/policy_manager.go b/policy/policy_manager.go index 57937b2c0..f2b97a143 100644 --- a/policy/policy_manager.go +++ b/policy/policy_manager.go @@ -2,14 +2,15 @@ package policy import ( + "context" "fmt" "path/filepath" "strings" "github.com/kopia/kopia/fs/ignorefs" "github.com/kopia/kopia/internal/kopialogging" - "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" ) @@ -21,12 +22,15 @@ // GetEffectivePolicy calculates effective snapshot policy for a given source by combining the source-specifc policy (if any) // with parent policies. The source must contain a path. // Returns the effective policies and all source policies that contributed to that (most specific first). -func GetEffectivePolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, []*Policy, error) { +func GetEffectivePolicy(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo) (*Policy, []*Policy, error) { var md []*manifest.EntryMetadata // Find policies applying to paths all the way up to the root. for tmp := si; len(si.Path) > 0; { - manifests := rep.Manifests.Find(labelsForSource(tmp)) + manifests, err := rep.Manifests.Find(ctx, labelsForSource(tmp)) + if err != nil { + return nil, nil, err + } md = append(md, manifests...) parentPath := filepath.Dir(tmp.Path) @@ -38,19 +42,33 @@ func GetEffectivePolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, } // Try user@host policy - md = append(md, rep.Manifests.Find(labelsForSource(snapshot.SourceInfo{Host: si.Host, UserName: si.UserName}))...) + userHostManifests, err := rep.Manifests.Find(ctx, labelsForSource(snapshot.SourceInfo{Host: si.Host, UserName: si.UserName})) + if err != nil { + return nil, nil, err + } + md = append(md, userHostManifests...) // Try host-level policy. - md = append(md, rep.Manifests.Find(labelsForSource(snapshot.SourceInfo{Host: si.Host}))...) + if err != nil { + return nil, nil, err + } + hostManifests, err := rep.Manifests.Find(ctx, labelsForSource(snapshot.SourceInfo{Host: si.Host})) + if err != nil { + return nil, nil, err + } + md = append(md, hostManifests...) // Global policy. - globalManifests := rep.Manifests.Find(labelsForSource(GlobalPolicySourceInfo)) + globalManifests, err := rep.Manifests.Find(ctx, labelsForSource(GlobalPolicySourceInfo)) + if err != nil { + return nil, nil, err + } md = append(md, globalManifests...) var policies []*Policy for _, em := range md { p := &Policy{} - if err := rep.Manifests.Get(em.ID, &p); err != nil { + if err := rep.Manifests.Get(ctx, em.ID, &p); err != nil { return nil, nil, fmt.Errorf("got unexpected error when loading policy item %v: %v", em.ID, err) } p.Labels = em.Labels @@ -65,8 +83,11 @@ func GetEffectivePolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, } // GetDefinedPolicy returns the policy defined on the provided snapshot.SourceInfo or ErrPolicyNotFound if not present. -func GetDefinedPolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, error) { - md := rep.Manifests.Find(labelsForSource(si)) +func GetDefinedPolicy(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo) (*Policy, error) { + md, err := rep.Manifests.Find(ctx, labelsForSource(si)) + if err != nil { + return nil, fmt.Errorf("unable to find policy for source: %v", err) + } if len(md) == 0 { return nil, ErrPolicyNotFound @@ -75,7 +96,7 @@ func GetDefinedPolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, er if len(md) == 1 { p := &Policy{} - err := rep.Manifests.Get(md[0].ID, p) + err := rep.Manifests.Get(ctx, md[0].ID, p) if err == manifest.ErrNotFound { return nil, ErrPolicyNotFound } @@ -84,7 +105,7 @@ func GetDefinedPolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, er return nil, err } - em, err := rep.Manifests.GetMetadata(md[0].ID) + em, err := rep.Manifests.GetMetadata(ctx, md[0].ID) if err != nil { return nil, ErrPolicyNotFound } @@ -97,10 +118,13 @@ func GetDefinedPolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, er } // SetPolicy sets the policy on a given source. -func SetPolicy(rep *repo.Repository, si snapshot.SourceInfo, pol *Policy) error { - md := rep.Manifests.Find(labelsForSource(si)) +func SetPolicy(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo, pol *Policy) error { + md, err := rep.Manifests.Find(ctx, labelsForSource(si)) + if err != nil { + return fmt.Errorf("unable to load manifests for %v: %v", si, err) + } - if _, err := rep.Manifests.Put(labelsForSource(si), pol); err != nil { + if _, err := rep.Manifests.Put(ctx, labelsForSource(si), pol); err != nil { return err } @@ -112,8 +136,12 @@ func SetPolicy(rep *repo.Repository, si snapshot.SourceInfo, pol *Policy) error } // RemovePolicy removes the policy for a given source. -func RemovePolicy(rep *repo.Repository, si snapshot.SourceInfo) error { - md := rep.Manifests.Find(labelsForSource(si)) +func RemovePolicy(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo) error { + md, err := rep.Manifests.Find(ctx, labelsForSource(si)) + if err != nil { + return fmt.Errorf("unable to load manifests for %v: %v", si, err) + } + for _, em := range md { rep.Manifests.Delete(em.ID) } @@ -122,9 +150,9 @@ func RemovePolicy(rep *repo.Repository, si snapshot.SourceInfo) error { } // GetPolicyByID gets the policy for a given unique ID or ErrPolicyNotFound if not found. -func GetPolicyByID(rep *repo.Repository, id string) (*Policy, error) { +func GetPolicyByID(ctx context.Context, rep *repo.Repository, id string) (*Policy, error) { p := &Policy{} - if err := rep.Manifests.Get(id, &p); err != nil { + if err := rep.Manifests.Get(ctx, id, &p); err != nil { if err == manifest.ErrNotFound { return nil, ErrPolicyNotFound } @@ -134,21 +162,24 @@ func GetPolicyByID(rep *repo.Repository, id string) (*Policy, error) { } // ListPolicies returns a list of all policies. -func ListPolicies(rep *repo.Repository) ([]*Policy, error) { - ids := rep.Manifests.Find(map[string]string{ +func ListPolicies(ctx context.Context, rep *repo.Repository) ([]*Policy, error) { + ids, err := rep.Manifests.Find(ctx, map[string]string{ "type": "policy", }) + if err != nil { + return nil, fmt.Errorf("unable to list manifests: %v", err) + } var policies []*Policy for _, id := range ids { pol := &Policy{} - err := rep.Manifests.Get(id.ID, pol) + err := rep.Manifests.Get(ctx, id.ID, pol) if err != nil { return nil, err } - md, err := rep.Manifests.GetMetadata(id.ID) + md, err := rep.Manifests.GetMetadata(ctx, id.ID) if err != nil { return nil, err } @@ -162,10 +193,10 @@ func ListPolicies(rep *repo.Repository) ([]*Policy, error) { } // FilesPolicyGetter returns ignorefs.FilesPolicyGetter for a given source. -func FilesPolicyGetter(rep *repo.Repository, si snapshot.SourceInfo) (ignorefs.FilesPolicyGetter, error) { +func FilesPolicyGetter(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo) (ignorefs.FilesPolicyGetter, error) { result := ignorefs.FilesPolicyMap{} - pol, _, err := GetEffectivePolicy(rep, si) + pol, _, err := GetEffectivePolicy(ctx, rep, si) if err != nil { return nil, err } @@ -173,17 +204,20 @@ func FilesPolicyGetter(rep *repo.Repository, si snapshot.SourceInfo) (ignorefs.F result["."] = &pol.FilesPolicy // Find all policies for this host and user - policies := rep.Manifests.Find(map[string]string{ + policies, err := rep.Manifests.Find(ctx, map[string]string{ "type": "policy", "policyType": "path", "username": si.UserName, "hostname": si.Host, }) + if err != nil { + return nil, fmt.Errorf("unable to find manifests for %v@%v: %v", si.UserName, si.Host, err) + } log.Debugf("found %v policies for %v@%v", si.UserName, si.Host) for _, id := range policies { - em, err := rep.Manifests.GetMetadata(id.ID) + em, err := rep.Manifests.GetMetadata(ctx, id.ID) if err != nil { return nil, err } @@ -201,7 +235,7 @@ func FilesPolicyGetter(rep *repo.Repository, si snapshot.SourceInfo) (ignorefs.F rel = "./" + rel log.Debugf("loading policy for %v (%v)", policyPath, rel) pol := &Policy{} - if err := rep.Manifests.Get(id.ID, pol); err != nil { + if err := rep.Manifests.Get(ctx, id.ID, pol); err != nil { return nil, fmt.Errorf("unable to load policy %v: %v", id.ID, err) } result[rel] = &pol.FilesPolicy diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index 6b8e0d281..1ba98ce84 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -32,6 +32,7 @@ type Manager struct { mu sync.Mutex b *block.Manager + initialized bool pendingEntries map[string]*manifestEntry committedEntries map[string]*manifestEntry @@ -39,10 +40,14 @@ type Manager struct { } // Put serializes the provided payload to JSON and persists it. Returns unique handle that represents the object. -func (m *Manager) Put(labels map[string]string, payload interface{}) (string, error) { +func (m *Manager) Put(ctx context.Context, labels map[string]string, payload interface{}) (string, error) { if labels["type"] == "" { return "", fmt.Errorf("'type' label is required") } + + if err := m.ensureInitialized(ctx); err != nil { + return "", err + } m.mu.Lock() defer m.mu.Unlock() @@ -69,7 +74,11 @@ func (m *Manager) Put(labels map[string]string, payload interface{}) (string, er } // GetMetadata returns metadata about provided manifest item or ErrNotFound if the item can't be found. -func (m *Manager) GetMetadata(id string) (*EntryMetadata, error) { +func (m *Manager) GetMetadata(ctx context.Context, id string) (*EntryMetadata, error) { + if err := m.ensureInitialized(ctx); err != nil { + return nil, err + } + m.mu.Lock() defer m.mu.Unlock() @@ -92,8 +101,12 @@ func (m *Manager) GetMetadata(id string) (*EntryMetadata, error) { // Get retrieves the contents of the provided manifest item by deserializing it as JSON to provided object. // If the manifest is not found, returns ErrNotFound. -func (m *Manager) Get(id string, data interface{}) error { - b, err := m.GetRaw(id) +func (m *Manager) Get(ctx context.Context, id string, data interface{}) error { + if err := m.ensureInitialized(ctx); err != nil { + return err + } + + b, err := m.GetRaw(ctx, id) if err != nil { return err } @@ -106,7 +119,11 @@ func (m *Manager) Get(id string, data interface{}) error { } // GetRaw returns raw contents of the provided manifest (JSON bytes) or ErrNotFound if not found. -func (m *Manager) GetRaw(id string) ([]byte, error) { +func (m *Manager) GetRaw(ctx context.Context, id string) ([]byte, error) { + if err := m.ensureInitialized(ctx); err != nil { + return nil, err + } + m.mu.Lock() defer m.mu.Unlock() @@ -122,7 +139,11 @@ func (m *Manager) GetRaw(id string) ([]byte, error) { } // Find returns the list of EntryMetadata for manifest entries matching all provided labels. -func (m *Manager) Find(labels map[string]string) []*EntryMetadata { +func (m *Manager) Find(ctx context.Context, labels map[string]string) ([]*EntryMetadata, error) { + if err := m.ensureInitialized(ctx); err != nil { + return nil, err + } + m.mu.Lock() defer m.mu.Unlock() @@ -146,7 +167,7 @@ func (m *Manager) Find(labels map[string]string) []*EntryMetadata { sort.Slice(matches, func(i, j int) bool { return matches[i].ModTime.Before(matches[j].ModTime) }) - return matches + return matches, nil } func cloneEntryMetadata(e *manifestEntry) *EntryMetadata { @@ -232,14 +253,14 @@ func (m *Manager) Delete(id string) { // Refresh updates the committed blocks from the underlying storage. func (m *Manager) Refresh(ctx context.Context) error { - return m.loadCommittedBlocks(ctx) -} - -func (m *Manager) loadCommittedBlocks(ctx context.Context) error { - log.Debugf("listing manifest blocks") m.mu.Lock() defer m.mu.Unlock() + return m.loadCommittedBlocksLocked(ctx) +} + +func (m *Manager) loadCommittedBlocksLocked(ctx context.Context) error { + log.Debugf("listing manifest blocks") for { blocks, err := m.b.ListBlocks(manifestBlockPrefix) if err != nil { @@ -451,6 +472,22 @@ func (m *Manager) mergeEntry(e *manifestEntry) { } } +func (m *Manager) ensureInitialized(ctx context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.initialized { + return nil + } + + if err := m.loadCommittedBlocksLocked(ctx); err != nil { + return err + } + + m.initialized = true + return nil +} + func copyLabels(m map[string]string) map[string]string { r := map[string]string{} for k, v := range m { @@ -468,9 +505,5 @@ func NewManager(ctx context.Context, b *block.Manager) (*Manager, error) { committedBlockIDs: map[string]bool{}, } - if err := m.loadCommittedBlocks(ctx); err != nil { - return nil, err - } - return m, nil } diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 673b3eb4c..d26a02cd6 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -8,8 +8,8 @@ "testing" "time" - "github.com/kopia/kopia/repo/internal/storagetesting" "github.com/kopia/kopia/repo/block" + "github.com/kopia/kopia/repo/internal/storagetesting" ) func TestManifest(t *testing.T) { @@ -28,9 +28,9 @@ func TestManifest(t *testing.T) { labels2 := map[string]string{"type": "item", "color": "blue", "shape": "square"} labels3 := map[string]string{"type": "item", "shape": "square", "color": "red"} - id1 := addAndVerify(t, mgr, labels1, item1) - id2 := addAndVerify(t, mgr, labels2, item2) - id3 := addAndVerify(t, mgr, labels3, item3) + id1 := addAndVerify(ctx, t, mgr, labels1, item1) + id2 := addAndVerify(ctx, t, mgr, labels2, item2) + id3 := addAndVerify(ctx, t, mgr, labels3, item3) cases := []struct { criteria map[string]string @@ -46,11 +46,11 @@ func TestManifest(t *testing.T) { // verify before flush for _, tc := range cases { - verifyMatches(t, mgr, tc.criteria, tc.expected) + verifyMatches(ctx, t, mgr, tc.criteria, tc.expected) } - verifyItem(t, mgr, id1, labels1, item1) - verifyItem(t, mgr, id2, labels2, item2) - verifyItem(t, mgr, id3, labels3, item3) + verifyItem(ctx, t, mgr, id1, labels1, item1) + verifyItem(ctx, t, mgr, id2, labels2, item2) + verifyItem(ctx, t, mgr, id3, labels3, item3) if err := mgr.Flush(ctx); err != nil { t.Errorf("flush error: %v", err) @@ -61,11 +61,11 @@ func TestManifest(t *testing.T) { // verify after flush for _, tc := range cases { - verifyMatches(t, mgr, tc.criteria, tc.expected) + verifyMatches(ctx, t, mgr, tc.criteria, tc.expected) } - verifyItem(t, mgr, id1, labels1, item1) - verifyItem(t, mgr, id2, labels2, item2) - verifyItem(t, mgr, id3, labels3, item3) + verifyItem(ctx, t, mgr, id1, labels1, item1) + verifyItem(ctx, t, mgr, id2, labels2, item2) + verifyItem(ctx, t, mgr, id3, labels3, item3) // flush underlying block manager and verify in new manifest manager. mgr.b.Flush(ctx) @@ -74,11 +74,11 @@ func TestManifest(t *testing.T) { t.Fatalf("can't open block manager: %v", setupErr) } for _, tc := range cases { - verifyMatches(t, mgr2, tc.criteria, tc.expected) + verifyMatches(ctx, t, mgr2, tc.criteria, tc.expected) } - verifyItem(t, mgr2, id1, labels1, item1) - verifyItem(t, mgr2, id2, labels2, item2) - verifyItem(t, mgr2, id3, labels3, item3) + verifyItem(ctx, t, mgr2, id1, labels1, item1) + verifyItem(ctx, t, mgr2, id2, labels2, item2) + verifyItem(ctx, t, mgr2, id3, labels3, item3) if err := mgr2.Flush(ctx); err != nil { t.Errorf("flush error: %v", err) } @@ -86,13 +86,13 @@ func TestManifest(t *testing.T) { // delete from one time.Sleep(1 * time.Second) mgr.Delete(id3) - verifyItemNotFound(t, mgr, id3) + verifyItemNotFound(ctx, t, mgr, id3) mgr.Flush(ctx) - verifyItemNotFound(t, mgr, id3) + verifyItemNotFound(ctx, t, mgr, id3) // still found in another - verifyItem(t, mgr2, id3, labels3, item3) - if err := mgr2.loadCommittedBlocks(ctx); err != nil { + verifyItem(ctx, t, mgr2, id3, labels3, item3) + if err := mgr2.loadCommittedBlocksLocked(ctx); err != nil { t.Errorf("unable to load: %v", err) } @@ -115,27 +115,27 @@ func TestManifest(t *testing.T) { t.Fatalf("can't open manager: %v", err) } - verifyItem(t, mgr3, id1, labels1, item1) - verifyItem(t, mgr3, id2, labels2, item2) - verifyItemNotFound(t, mgr3, id3) + verifyItem(ctx, t, mgr3, id1, labels1, item1) + verifyItem(ctx, t, mgr3, id2, labels2, item2) + verifyItemNotFound(ctx, t, mgr3, id3) } -func addAndVerify(t *testing.T, mgr *Manager, labels map[string]string, data map[string]int) string { +func addAndVerify(ctx context.Context, t *testing.T, mgr *Manager, labels map[string]string, data map[string]int) string { t.Helper() - id, err := mgr.Put(labels, data) + id, err := mgr.Put(ctx, labels, data) if err != nil { t.Errorf("unable to add %v (%v): %v", labels, data, err) return "" } - verifyItem(t, mgr, id, labels, data) + verifyItem(ctx, t, mgr, id, labels, data) return id } -func verifyItem(t *testing.T, mgr *Manager, id string, labels map[string]string, data map[string]int) { +func verifyItem(ctx context.Context, t *testing.T, mgr *Manager, id string, labels map[string]string, data map[string]int) { t.Helper() - l, err := mgr.GetMetadata(id) + l, err := mgr.GetMetadata(ctx, id) if err != nil { t.Errorf("unable to retrieve %q: %v", id, err) return @@ -146,21 +146,26 @@ func verifyItem(t *testing.T, mgr *Manager, id string, labels map[string]string, } } -func verifyItemNotFound(t *testing.T, mgr *Manager, id string) { +func verifyItemNotFound(ctx context.Context, t *testing.T, mgr *Manager, id string) { t.Helper() - _, err := mgr.GetMetadata(id) + _, err := mgr.GetMetadata(ctx, id) if got, want := err, ErrNotFound; got != want { t.Errorf("invalid error when getting %q %v, expected %v", id, err, ErrNotFound) return } } -func verifyMatches(t *testing.T, mgr *Manager, labels map[string]string, expected []string) { +func verifyMatches(ctx context.Context, t *testing.T, mgr *Manager, labels map[string]string, expected []string) { t.Helper() var matches []string - for _, m := range mgr.Find(labels) { + items, err := mgr.Find(ctx, labels) + if err != nil { + t.Errorf("error in Find(): %v", err) + return + } + for _, m := range items { matches = append(matches, m.ID) } sort.Strings(matches) diff --git a/repo/tests/repository_stress_test/repository_stress_test.go b/repo/tests/repository_stress_test/repository_stress_test.go index eda69b277..a414136cf 100644 --- a/repo/tests/repository_stress_test/repository_stress_test.go +++ b/repo/tests/repository_stress_test/repository_stress_test.go @@ -280,12 +280,15 @@ func refresh(ctx context.Context, t *testing.T, r *repo.Repository) error { } func readRandomManifest(ctx context.Context, t *testing.T, r *repo.Repository) error { - manifests := r.Manifests.Find(nil) + manifests, err := r.Manifests.Find(ctx, nil) + if err != nil { + return err + } if len(manifests) == 0 { return nil } n := rand.Intn(len(manifests)) - _, err := r.Manifests.GetRaw(manifests[n].ID) + _, err = r.Manifests.GetRaw(ctx, manifests[n].ID) return err } @@ -298,7 +301,7 @@ func writeRandomManifest(ctx context.Context, t *testing.T, r *repo.Repository) content2 := fmt.Sprintf("content-%v", rand.Intn(10)) content1val := fmt.Sprintf("val1-%v", rand.Intn(10)) content2val := fmt.Sprintf("val2-%v", rand.Intn(10)) - _, err := r.Manifests.Put(map[string]string{ + _, err := r.Manifests.Put(ctx, map[string]string{ "type": key1, key1: val1, key2: val2, diff --git a/snapshot/manager.go b/snapshot/manager.go index 1309aa3ff..45df05955 100644 --- a/snapshot/manager.go +++ b/snapshot/manager.go @@ -2,6 +2,9 @@ package snapshot import ( + "context" + "fmt" + "github.com/kopia/kopia/internal/kopialogging" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/manifest" @@ -10,10 +13,13 @@ var log = kopialogging.Logger("kopia/snapshot") // ListSources lists all snapshot sources in a given repository. -func ListSources(rep *repo.Repository) []SourceInfo { - items := rep.Manifests.Find(map[string]string{ +func ListSources(ctx context.Context, rep *repo.Repository) ([]SourceInfo, error) { + items, err := rep.Manifests.Find(ctx, map[string]string{ "type": "snapshot", }) + if err != nil { + return nil, fmt.Errorf("unable to find manifest entries: %v", err) + } uniq := map[SourceInfo]bool{} for _, it := range items { @@ -25,7 +31,7 @@ func ListSources(rep *repo.Repository) []SourceInfo { infos = append(infos, k) } - return infos + return infos, nil } func sourceInfoFromLabels(labels map[string]string) SourceInfo { @@ -42,15 +48,19 @@ func sourceInfoToLabels(si SourceInfo) map[string]string { } // ListSnapshots lists all snapshots for a given source. -func ListSnapshots(rep *repo.Repository, si SourceInfo) ([]*Manifest, error) { - return LoadSnapshots(rep, manifest.EntryIDs(rep.Manifests.Find(sourceInfoToLabels(si)))) +func ListSnapshots(ctx context.Context, rep *repo.Repository, si SourceInfo) ([]*Manifest, error) { + entries, err := rep.Manifests.Find(ctx, sourceInfoToLabels(si)) + if err != nil { + return nil, fmt.Errorf("unable to find manifest entries: %v", err) + } + return LoadSnapshots(ctx, rep, manifest.EntryIDs(entries)) } // LoadSnapshot loads and parses a snapshot with a given ID. -func LoadSnapshot(rep *repo.Repository, manifestID string) (*Manifest, error) { +func LoadSnapshot(ctx context.Context, rep *repo.Repository, manifestID string) (*Manifest, error) { sm := &Manifest{} - if err := rep.Manifests.Get(manifestID, sm); err != nil { - return nil, err + if err := rep.Manifests.Get(ctx, manifestID, sm); err != nil { + return nil, fmt.Errorf("unable to find manifest entries: %v", err) } sm.ID = manifestID @@ -59,12 +69,12 @@ func LoadSnapshot(rep *repo.Repository, manifestID string) (*Manifest, error) { } // SaveSnapshot persists given snapshot manifest and returns manifest ID. -func SaveSnapshot(rep *repo.Repository, manifest *Manifest) (string, error) { - return rep.Manifests.Put(sourceInfoToLabels(manifest.Source), manifest) +func SaveSnapshot(ctx context.Context, rep *repo.Repository, manifest *Manifest) (string, error) { + return rep.Manifests.Put(ctx, sourceInfoToLabels(manifest.Source), manifest) } // LoadSnapshots efficiently loads and parses a given list of snapshot IDs. -func LoadSnapshots(rep *repo.Repository, names []string) ([]*Manifest, error) { +func LoadSnapshots(ctx context.Context, rep *repo.Repository, names []string) ([]*Manifest, error) { result := make([]*Manifest, len(names)) sem := make(chan bool, 50) @@ -73,7 +83,7 @@ func LoadSnapshots(rep *repo.Repository, names []string) ([]*Manifest, error) { go func(i int, n string) { defer func() { <-sem }() - m, err := LoadSnapshot(rep, n) + m, err := LoadSnapshot(ctx, rep, n) if err != nil { log.Warningf("unable to parse snapshot manifest %v: %v", n, err) return @@ -98,7 +108,7 @@ func LoadSnapshots(rep *repo.Repository, names []string) ([]*Manifest, error) { } // ListSnapshotManifests returns the list of snapshot manifests for a given source or all sources if nil. -func ListSnapshotManifests(rep *repo.Repository, src *SourceInfo) []string { +func ListSnapshotManifests(ctx context.Context, rep *repo.Repository, src *SourceInfo) ([]string, error) { labels := map[string]string{ "type": "snapshot", } @@ -107,5 +117,10 @@ func ListSnapshotManifests(rep *repo.Repository, src *SourceInfo) []string { labels = sourceInfoToLabels(*src) } - return manifest.EntryIDs(rep.Manifests.Find(labels)) + entries, err := rep.Manifests.Find(ctx, labels) + if err != nil { + return nil, fmt.Errorf("unable to find manifest entries: %v", err) + } + + return manifest.EntryIDs(entries), nil }