plumbed through context to all manifest manager methods and switched to lazy initialization

This commit is contained in:
Jarek Kowalski
2018-09-12 19:15:53 -07:00
parent f2c7c29a57
commit 906b8eaee3
29 changed files with 317 additions and 186 deletions

View File

@@ -31,7 +31,10 @@ func listManifestItems(ctx context.Context, rep *repo.Repository) error {
filter[kv[0:p]] = kv[p+1:]
}
items := rep.Manifests.Find(filter)
items, err := rep.Manifests.Find(ctx, filter)
if err != nil {
return err
}
sort.Slice(items, func(i, j int) bool {
for _, key := range *manifestListSort {

View File

@@ -19,12 +19,12 @@ func init() {
func showManifestItems(ctx context.Context, rep *repo.Repository) error {
for _, it := range *manifestShowItems {
md, err := rep.Manifests.GetMetadata(it)
md, err := rep.Manifests.GetMetadata(ctx, it)
if err != nil {
return fmt.Errorf("error getting metadata for %q: %v", it, err)
}
b, err := rep.Manifests.GetRaw(it)
b, err := rep.Manifests.GetRaw(ctx, it)
if err != nil {
return fmt.Errorf("error showing %q: %v", it, err)
}

View File

@@ -200,7 +200,7 @@ func runVerifyCommand(ctx context.Context, rep *repo.Repository) error {
}
func enqueueRootsToVerify(ctx context.Context, v *verifier, rep *repo.Repository) error {
manifests, err := loadSourceManifests(rep, *verifyCommandAllSources, *verifyCommandSources)
manifests, err := loadSourceManifests(ctx, rep, *verifyCommandAllSources, *verifyCommandSources)
if err != nil {
return err
}
@@ -239,20 +239,28 @@ func enqueueRootsToVerify(ctx context.Context, v *verifier, rep *repo.Repository
return nil
}
func loadSourceManifests(rep *repo.Repository, all bool, sources []string) ([]*snapshot.Manifest, error) {
func loadSourceManifests(ctx context.Context, rep *repo.Repository, all bool, sources []string) ([]*snapshot.Manifest, error) {
var manifestIDs []string
if *verifyCommandAllSources {
manifestIDs = append(manifestIDs, snapshot.ListSnapshotManifests(rep, nil)...)
man, err := snapshot.ListSnapshotManifests(ctx, rep, nil)
if err != nil {
return nil, err
}
manifestIDs = append(manifestIDs, man...)
} else {
for _, srcStr := range *verifyCommandSources {
src, err := snapshot.ParseSourceInfo(srcStr, getHostName(), getUserName())
if err != nil {
return nil, fmt.Errorf("error parsing %q: %v", srcStr, err)
}
manifestIDs = append(manifestIDs, snapshot.ListSnapshotManifests(rep, &src)...)
man, err := snapshot.ListSnapshotManifests(ctx, rep, &src)
if err != nil {
return nil, err
}
manifestIDs = append(manifestIDs, man...)
}
}
return snapshot.LoadSnapshots(rep, manifestIDs)
return snapshot.LoadSnapshots(ctx, rep, manifestIDs)
}
func init() {

View File

@@ -1,6 +1,7 @@
package cli
import (
"context"
"fmt"
"github.com/kopia/kopia/policy"
@@ -8,7 +9,7 @@
"github.com/kopia/kopia/snapshot"
)
func policyTargets(rep *repo.Repository, globalFlag *bool, targetsFlag *[]string) ([]snapshot.SourceInfo, error) {
func policyTargets(ctx context.Context, rep *repo.Repository, globalFlag *bool, targetsFlag *[]string) ([]snapshot.SourceInfo, error) {
if *globalFlag == (len(*targetsFlag) > 0) {
return nil, fmt.Errorf("must pass either '--global' or a list of path targets")
}
@@ -21,7 +22,7 @@ func policyTargets(rep *repo.Repository, globalFlag *bool, targetsFlag *[]string
var res []snapshot.SourceInfo
for _, ts := range *targetsFlag {
if t, err := policy.GetPolicyByID(rep, ts); err == nil {
if t, err := policy.GetPolicyByID(ctx, rep, ts); err == nil {
res = append(res, t.Target())
continue
}

View File

@@ -57,13 +57,13 @@ func init() {
}
func editPolicy(ctx context.Context, rep *repo.Repository) error {
targets, err := policyTargets(rep, policyEditGlobal, policyEditTargets)
targets, err := policyTargets(ctx, rep, policyEditGlobal, policyEditTargets)
if err != nil {
return err
}
for _, target := range targets {
original, err := policy.GetDefinedPolicy(rep, target)
original, err := policy.GetDefinedPolicy(ctx, rep, target)
if err == policy.ErrPolicyNotFound {
original = &policy.Policy{}
}
@@ -98,7 +98,7 @@ func editPolicy(ctx context.Context, rep *repo.Repository) error {
fmt.Scanf("%v", &shouldSave) //nolint:errcheck
if strings.HasPrefix(strings.ToLower(shouldSave), "y") {
if err := policy.SetPolicy(rep, target, updated); err != nil {
if err := policy.SetPolicy(ctx, rep, target, updated); err != nil {
return fmt.Errorf("can't save policy for %v: %v", target, err)
}
}

View File

@@ -18,7 +18,7 @@ func init() {
}
func listPolicies(ctx context.Context, rep *repo.Repository) error {
policies, err := policy.ListPolicies(rep)
policies, err := policy.ListPolicies(ctx, rep)
if err != nil {
return err
}

View File

@@ -18,14 +18,14 @@ func init() {
}
func removePolicy(ctx context.Context, rep *repo.Repository) error {
targets, err := policyTargets(rep, policyRemoveGlobal, policyRemoveTargets)
targets, err := policyTargets(ctx, rep, policyRemoveGlobal, policyRemoveTargets)
if err != nil {
return err
}
for _, target := range targets {
log.Infof("Removing policy on %q...", target)
if err := policy.RemovePolicy(rep, target); err != nil {
if err := policy.RemovePolicy(ctx, rep, target); err != nil {
return err
}
}

View File

@@ -54,13 +54,13 @@ func init() {
}
func setPolicy(ctx context.Context, rep *repo.Repository) error {
targets, err := policyTargets(rep, policySetGlobal, policySetTargets)
targets, err := policyTargets(ctx, rep, policySetGlobal, policySetTargets)
if err != nil {
return err
}
for _, target := range targets {
p, err := policy.GetDefinedPolicy(rep, target)
p, err := policy.GetDefinedPolicy(ctx, rep, target)
if err == policy.ErrPolicyNotFound {
p = &policy.Policy{}
}
@@ -76,7 +76,7 @@ func setPolicy(ctx context.Context, rep *repo.Repository) error {
return fmt.Errorf("no changes specified")
}
if err := policy.SetPolicy(rep, target, p); err != nil {
if err := policy.SetPolicy(ctx, rep, target, p); err != nil {
return fmt.Errorf("can't save policy for %v: %v", target, err)
}
}

View File

@@ -21,13 +21,13 @@ func init() {
}
func showPolicy(ctx context.Context, rep *repo.Repository) error {
targets, err := policyTargets(rep, policyShowGlobal, policyShowTargets)
targets, err := policyTargets(ctx, rep, policyShowGlobal, policyShowTargets)
if err != nil {
return err
}
for _, target := range targets {
effective, policies, err := policy.GetEffectivePolicy(rep, target)
effective, policies, err := policy.GetEffectivePolicy(ctx, rep, target)
if err != nil {
return fmt.Errorf("can't get effective policy for %q: %v", target, err)
}

View File

@@ -32,7 +32,7 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error {
return fmt.Errorf("can't open source repository: %v", err)
}
sources, err := getSourcesToMigrate(sourceRepo)
sources, err := getSourcesToMigrate(ctx, sourceRepo)
if err != nil {
return fmt.Errorf("can't retrieve sources: %v", err)
}
@@ -68,8 +68,11 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error {
func migrateSingleSource(ctx context.Context, uploader *upload.Uploader, sourceRepo, destRepo *repo.Repository, s snapshot.SourceInfo) error {
log.Debugf("migrating source %v", s)
manifests := snapshot.ListSnapshotManifests(sourceRepo, &s)
snapshots, err := snapshot.LoadSnapshots(sourceRepo, manifests)
manifests, err := snapshot.ListSnapshotManifests(ctx, sourceRepo, &s)
if err != nil {
return err
}
snapshots, err := snapshot.LoadSnapshots(ctx, sourceRepo, manifests)
if err != nil {
return fmt.Errorf("unable to load snapshot manifests for %v: %v", s, err)
}
@@ -86,7 +89,7 @@ func migrateSingleSource(ctx context.Context, uploader *upload.Uploader, sourceR
m.Stats = newm.Stats
m.IncompleteReason = newm.IncompleteReason
if _, err := snapshot.SaveSnapshot(destRepo, m); err != nil {
if _, err := snapshot.SaveSnapshot(ctx, destRepo, m); err != nil {
return fmt.Errorf("cannot save manifest: %v", err)
}
}
@@ -101,7 +104,7 @@ func filterSnapshotsToMigrate(s []*snapshot.Manifest) []*snapshot.Manifest {
return s
}
func getSourcesToMigrate(rep *repo.Repository) ([]snapshot.SourceInfo, error) {
func getSourcesToMigrate(ctx context.Context, rep *repo.Repository) ([]snapshot.SourceInfo, error) {
if len(*migrateSources) > 0 {
var result []snapshot.SourceInfo
@@ -118,7 +121,7 @@ func getSourcesToMigrate(rep *repo.Repository) ([]snapshot.SourceInfo, error) {
}
if *migrateAll {
return snapshot.ListSources(rep), nil
return snapshot.ListSources(ctx, rep)
}
return nil, nil

View File

@@ -37,7 +37,7 @@
func runBackupCommand(ctx context.Context, rep *repo.Repository) error {
sources := *snapshotCreateSources
if *snapshotCreateAll {
local, err := getLocalBackupPaths(rep)
local, err := getLocalBackupPaths(ctx, rep)
if err != nil {
return err
}
@@ -90,12 +90,12 @@ func snapshotSingleSource(ctx context.Context, rep *repo.Repository, u *upload.U
localEntry := mustGetLocalFSEntry(sourceInfo.Path)
previousManifest, err := findPreviousSnapshotManifest(rep, sourceInfo)
previousManifest, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo)
if err != nil {
return err
}
u.FilesPolicy, err = policy.FilesPolicyGetter(rep, sourceInfo)
u.FilesPolicy, err = policy.FilesPolicyGetter(ctx, rep, sourceInfo)
if err != nil {
return err
}
@@ -108,7 +108,7 @@ func snapshotSingleSource(ctx context.Context, rep *repo.Repository, u *upload.U
manifest.Description = *snapshotCreateDescription
snapID, err := snapshot.SaveSnapshot(rep, manifest)
snapID, err := snapshot.SaveSnapshot(ctx, rep, manifest)
if err != nil {
return fmt.Errorf("cannot save manifest: %v", err)
}
@@ -122,8 +122,8 @@ func snapshotSingleSource(ctx context.Context, rep *repo.Repository, u *upload.U
return nil
}
func findPreviousSnapshotManifest(rep *repo.Repository, sourceInfo snapshot.SourceInfo) (*snapshot.Manifest, error) {
previous, err := snapshot.ListSnapshots(rep, sourceInfo)
func findPreviousSnapshotManifest(ctx context.Context, rep *repo.Repository, sourceInfo snapshot.SourceInfo) (*snapshot.Manifest, error) {
previous, err := snapshot.ListSnapshots(ctx, rep, sourceInfo)
if err != nil {
return nil, fmt.Errorf("error listing previous backups: %v", err)
}
@@ -144,12 +144,15 @@ func findPreviousSnapshotManifest(rep *repo.Repository, sourceInfo snapshot.Sour
return previousManifest, nil
}
func getLocalBackupPaths(rep *repo.Repository) ([]string, error) {
func getLocalBackupPaths(ctx context.Context, rep *repo.Repository) ([]string, error) {
h := getHostName()
u := getUserName()
log.Debugf("Looking for previous backups of '%v@%v'...", u, h)
sources := snapshot.ListSources(rep)
sources, err := snapshot.ListSources(ctx, rep)
if err != nil {
return nil, fmt.Errorf("unable to list sources: %v", err)
}
var result []string

View File

@@ -86,7 +86,7 @@ func runSnapshotEstimateCommand(ctx context.Context, rep *repo.Repository) error
entry := mustGetLocalFSEntry(path)
if dir, ok := entry.(fs.Directory); ok {
ignorePolicy, err := policy.FilesPolicyGetter(rep, sourceInfo)
ignorePolicy, err := policy.FilesPolicyGetter(ctx, rep, sourceInfo)
if err != nil {
return err
}

View File

@@ -19,14 +19,14 @@
snapshotExpireDelete = snapshotExpireCommand.Flag("delete", "Whether to actually delete snapshots").Default("no").String()
)
func getSnapshotNamesToExpire(rep *repo.Repository) ([]string, error) {
func getSnapshotNamesToExpire(ctx context.Context, rep *repo.Repository) ([]string, error) {
if !*snapshotExpireAll && len(*snapshotExpirePaths) == 0 {
return nil, fmt.Errorf("Must specify paths to expire or --all")
}
if *snapshotExpireAll {
printStderr("Scanning all active snapshots...\n")
return snapshot.ListSnapshotManifests(rep, nil), nil
return snapshot.ListSnapshotManifests(ctx, rep, nil)
}
var result []string
@@ -39,7 +39,7 @@ func getSnapshotNamesToExpire(rep *repo.Repository) ([]string, error) {
log.Debugf("Looking for snapshots of %v", src)
matches := snapshot.ListSnapshotManifests(rep, &src)
matches, err := snapshot.ListSnapshotManifests(ctx, rep, &src)
if err != nil {
return nil, fmt.Errorf("error listing snapshots for %v: %v", src, err)
}
@@ -53,17 +53,17 @@ func getSnapshotNamesToExpire(rep *repo.Repository) ([]string, error) {
}
func runExpireCommand(ctx context.Context, rep *repo.Repository) error {
snapshotNames, err := getSnapshotNamesToExpire(rep)
snapshotNames, err := getSnapshotNamesToExpire(ctx, rep)
if err != nil {
return err
}
snapshots, err := snapshot.LoadSnapshots(rep, snapshotNames)
snapshots, err := snapshot.LoadSnapshots(ctx, rep, snapshotNames)
if err != nil {
return err
}
snapshots = filterHostAndUser(snapshots)
toDelete, err := policy.GetExpiredSnapshots(rep, snapshots)
toDelete, err := policy.GetExpiredSnapshots(ctx, rep, snapshots)
if err != nil {
return err
}

View File

@@ -29,9 +29,12 @@
maxResultsPerPath = snapshotListCommand.Flag("max-results", "Maximum number of results.").Default("1000").Int()
)
func findSnapshotsForSource(rep *repo.Repository, sourceInfo snapshot.SourceInfo) (manifestIDs []string, relPath string, err error) {
func findSnapshotsForSource(ctx context.Context, rep *repo.Repository, sourceInfo snapshot.SourceInfo) (manifestIDs []string, relPath string, err error) {
for len(sourceInfo.Path) > 0 {
list := snapshot.ListSnapshotManifests(rep, &sourceInfo)
list, err := snapshot.ListSnapshotManifests(ctx, rep, &sourceInfo)
if err != nil {
return nil, "", err
}
if len(list) > 0 {
return list, relPath, nil
@@ -55,9 +58,10 @@ func findSnapshotsForSource(rep *repo.Repository, sourceInfo snapshot.SourceInfo
return nil, "", nil
}
func findManifestIDs(rep *repo.Repository, source string) ([]string, string, error) {
func findManifestIDs(ctx context.Context, rep *repo.Repository, source string) ([]string, string, error) {
if source == "" {
return snapshot.ListSnapshotManifests(rep, nil), "", nil
man, err := snapshot.ListSnapshotManifests(ctx, rep, nil)
return man, "", err
}
si, err := snapshot.ParseSourceInfo(source, getHostName(), getUserName())
@@ -65,7 +69,7 @@ func findManifestIDs(rep *repo.Repository, source string) ([]string, string, err
return nil, "", fmt.Errorf("invalid directory: '%s': %s", source, err)
}
manifestIDs, relPath, err := findSnapshotsForSource(rep, si)
manifestIDs, relPath, err := findSnapshotsForSource(ctx, rep, si)
if relPath != "" {
relPath = "/" + relPath
}
@@ -74,12 +78,12 @@ func findManifestIDs(rep *repo.Repository, source string) ([]string, string, err
}
func runSnapshotsCommand(ctx context.Context, rep *repo.Repository) error {
manifestIDs, relPath, err := findManifestIDs(rep, *snapshotListPath)
manifestIDs, relPath, err := findManifestIDs(ctx, rep, *snapshotListPath)
if err != nil {
return err
}
manifests, err := snapshot.LoadSnapshots(rep, manifestIDs)
manifests, err := snapshot.LoadSnapshots(ctx, rep, manifestIDs)
if err != nil {
return err
}
@@ -94,7 +98,7 @@ func outputManifestGroups(ctx context.Context, rep *repo.Repository, manifests [
fmt.Printf("%v%v\n", separator, src)
separator = "\n"
pol, _, err := policy.GetEffectivePolicy(rep, src)
pol, _, err := policy.GetEffectivePolicy(ctx, rep, src)
if err != nil {
log.Warningf("unable to determine effective policy for %v", src)
} else {

View File

@@ -30,7 +30,10 @@ func (s *repositoryAllSources) Metadata() *fs.EntryMetadata {
}
func (s *repositoryAllSources) Readdir(ctx context.Context) (fs.Entries, error) {
srcs := snapshot.ListSources(s.rep)
srcs, err := snapshot.ListSources(ctx, s.rep)
if err != nil {
return nil, err
}
users := map[string]bool{}
for _, src := range srcs {

View File

@@ -29,7 +29,10 @@ func (s *sourceDirectories) Summary() *fs.DirectorySummary {
}
func (s *sourceDirectories) Readdir(ctx context.Context) (fs.Entries, error) {
sources := snapshot.ListSources(s.rep)
sources, err := snapshot.ListSources(ctx, s.rep)
if err != nil {
return nil, err
}
var result fs.Entries
for _, src := range sources {

View File

@@ -34,7 +34,7 @@ func (s *sourceSnapshots) Summary() *fs.DirectorySummary {
}
func (s *sourceSnapshots) Readdir(ctx context.Context) (fs.Entries, error) {
manifests, err := snapshot.ListSnapshots(s.rep, s.src)
manifests, err := snapshot.ListSnapshots(ctx, s.rep, s.src)
if err != nil {
return nil, err
}

View File

@@ -1,14 +1,15 @@
package server
import (
"context"
"net/http"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/policy"
)
func (s *Server) handlePolicyList(r *http.Request) (interface{}, *apiError) {
policies, err := policy.ListPolicies(s.rep)
func (s *Server) handlePolicyList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
policies, err := policy.ListPolicies(ctx, s.rep)
if err != nil {
return nil, internalServerError(err)
}

View File

@@ -1,6 +1,7 @@
package server
import (
"context"
"net/http"
"net/url"
"strings"
@@ -27,9 +28,13 @@ type snapshotListResponse struct {
Snapshots []*snapshotListEntry `json:"snapshots"`
}
func (s *Server) handleSourceSnapshotList(r *http.Request) (interface{}, *apiError) {
manifestIDs := snapshot.ListSnapshotManifests(s.rep, nil)
manifests, err := snapshot.LoadSnapshots(s.rep, manifestIDs)
func (s *Server) handleSourceSnapshotList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
manifestIDs, err := snapshot.ListSnapshotManifests(ctx, s.rep, nil)
if err != nil {
return nil, internalServerError(err)
}
manifests, err := snapshot.LoadSnapshots(ctx, s.rep, manifestIDs)
if err != nil {
return nil, internalServerError(err)
}
@@ -44,7 +49,7 @@ func (s *Server) handleSourceSnapshotList(r *http.Request) (interface{}, *apiErr
continue
}
pol, _, err := policy.GetEffectivePolicy(s.rep, first.Source)
pol, _, err := policy.GetEffectivePolicy(ctx, s.rep, first.Source)
if err == nil {
pol.RetentionPolicy.ComputeRetentionReasons(grp)
}

View File

@@ -1,13 +1,14 @@
package server
import (
"context"
"net/http"
"sort"
"github.com/kopia/kopia/internal/serverapi"
)
func (s *Server) handleSourcesList(r *http.Request) (interface{}, *apiError) {
func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
resp := &serverapi.SourcesResponse{
Sources: []serverapi.SourceStatus{},
}

View File

@@ -1,12 +1,13 @@
package server
import (
"context"
"net/http"
"github.com/kopia/kopia/internal/serverapi"
)
func (s *Server) handleStatus(r *http.Request) (interface{}, *apiError) {
func (s *Server) handleStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) {
bf := s.rep.Blocks.Format
bf.HMACSecret = nil
bf.MasterKey = nil

View File

@@ -3,6 +3,7 @@
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sync"
@@ -42,7 +43,7 @@ func (s *Server) APIHandlers() http.Handler {
return p
}
func (s *Server) handleAPI(f func(r *http.Request) (interface{}, *apiError)) http.Handler {
func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -51,7 +52,7 @@ func (s *Server) handleAPI(f func(r *http.Request) (interface{}, *apiError)) htt
e := json.NewEncoder(w)
e.SetIndent("", " ")
v, err := f(r)
v, err := f(context.Background(), r)
log.Debugf("returned %+v", v)
if err == nil {
if err := e.Encode(v); err != nil {
@@ -64,12 +65,12 @@ func (s *Server) handleAPI(f func(r *http.Request) (interface{}, *apiError)) htt
})
}
func (s *Server) handleRefresh(r *http.Request) (interface{}, *apiError) {
func (s *Server) handleRefresh(ctx context.Context, r *http.Request) (interface{}, *apiError) {
log.Infof("refreshing")
return &serverapi.Empty{}, nil
}
func (s *Server) handleFlush(r *http.Request) (interface{}, *apiError) {
func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{}, *apiError) {
log.Infof("flushing")
return &serverapi.Empty{}, nil
}
@@ -89,19 +90,19 @@ func (s *Server) forAllSourceManagersMatchingURLFilter(c func(s *sourceManager)
return resp, nil
}
func (s *Server) handleUpload(r *http.Request) (interface{}, *apiError) {
func (s *Server) handleUpload(ctx context.Context, r *http.Request) (interface{}, *apiError) {
return s.forAllSourceManagersMatchingURLFilter((*sourceManager).upload, r.URL.Query())
}
func (s *Server) handlePause(r *http.Request) (interface{}, *apiError) {
func (s *Server) handlePause(ctx context.Context, r *http.Request) (interface{}, *apiError) {
return s.forAllSourceManagersMatchingURLFilter((*sourceManager).pause, r.URL.Query())
}
func (s *Server) handleResume(r *http.Request) (interface{}, *apiError) {
func (s *Server) handleResume(ctx context.Context, r *http.Request) (interface{}, *apiError) {
return s.forAllSourceManagersMatchingURLFilter((*sourceManager).resume, r.URL.Query())
}
func (s *Server) handleCancel(r *http.Request) (interface{}, *apiError) {
func (s *Server) handleCancel(ctx context.Context, r *http.Request) (interface{}, *apiError) {
return s.forAllSourceManagersMatchingURLFilter((*sourceManager).cancel, r.URL.Query())
}
@@ -127,13 +128,18 @@ func New(ctx context.Context, rep *repo.Repository, hostname string, username st
uploadSemaphore: make(chan struct{}, 1),
}
for _, src := range snapshot.ListSources(rep) {
sources, err := snapshot.ListSources(ctx, rep)
if err != nil {
return nil, fmt.Errorf("unable to list sources: %v", err)
}
for _, src := range sources {
sm := newSourceManager(src, s)
s.sourceManagers[src] = sm
}
for _, src := range s.sourceManagers {
go src.run()
go src.run(ctx)
}
return s, nil

View File

@@ -64,19 +64,19 @@ func (s *sourceManager) setStatus(stat string) {
s.mu.Unlock()
}
func (s *sourceManager) run() {
func (s *sourceManager) run(ctx context.Context) {
s.setStatus("INITIALIZING")
defer s.setStatus("STOPPED")
if s.server.hostname == s.src.Host {
s.runLocal()
s.runLocal(ctx)
} else {
s.runRemote()
s.runRemote(ctx)
}
}
func (s *sourceManager) runLocal() {
s.refreshStatus()
func (s *sourceManager) runLocal(ctx context.Context) {
s.refreshStatus(ctx)
for {
var timeBeforeNextSnapshot time.Duration
if !s.nextSnapshotTime.IsZero() {
@@ -92,26 +92,26 @@ func (s *sourceManager) runLocal() {
return
case <-time.After(15 * time.Second):
s.refreshStatus()
s.refreshStatus(ctx)
case <-time.After(timeBeforeNextSnapshot):
log.Infof("snapshotting %v", s.src)
s.setStatus("SNAPSHOTTING")
s.snapshot()
s.refreshStatus()
s.snapshot(ctx)
s.refreshStatus(ctx)
}
}
}
func (s *sourceManager) runRemote() {
s.refreshStatus()
func (s *sourceManager) runRemote(ctx context.Context) {
s.refreshStatus(ctx)
s.setStatus("REMOTE")
for {
select {
case <-s.closed:
return
case <-time.After(15 * time.Second):
s.refreshStatus()
s.refreshStatus(ctx)
}
}
}
@@ -155,7 +155,7 @@ func (s *sourceManager) resume() serverapi.SourceActionResponse {
return serverapi.SourceActionResponse{Success: true}
}
func (s *sourceManager) snapshot() {
func (s *sourceManager) snapshot(ctx context.Context) {
s.server.beginUpload(s.src)
defer s.server.endUpload(s.src)
@@ -165,13 +165,12 @@ func (s *sourceManager) snapshot() {
return
}
u := upload.NewUploader(s.server.rep)
polGetter, err := policy.FilesPolicyGetter(s.server.rep, s.src)
polGetter, err := policy.FilesPolicyGetter(ctx, s.server.rep, s.src)
if err != nil {
log.Errorf("unable to create policy getter: %v", err)
}
u.FilesPolicy = polGetter
u.Progress = s
ctx := context.Background()
log.Infof("starting upload of %v", s.src)
manifest, err := u.Upload(ctx, localEntry, s.src, s.lastSnapshot)
@@ -180,7 +179,7 @@ func (s *sourceManager) snapshot() {
return
}
snapshotID, err := snapshot.SaveSnapshot(s.server.rep, manifest)
snapshotID, err := snapshot.SaveSnapshot(ctx, s.server.rep, manifest)
if err != nil {
log.Errorf("unable to save snapshot: %v", err)
return
@@ -220,16 +219,16 @@ func (s *sourceManager) findClosestNextSnapshotTime() time.Time {
return nextSnapshotTime
}
func (s *sourceManager) refreshStatus() {
func (s *sourceManager) refreshStatus(ctx context.Context) {
log.Debugf("refreshing state for %v", s.src)
pol, _, err := policy.GetEffectivePolicy(s.server.rep, s.src)
pol, _, err := policy.GetEffectivePolicy(ctx, s.server.rep, s.src)
if err != nil {
s.setStatus("FAILED")
return
}
s.pol = pol
snapshots, err := snapshot.ListSnapshots(s.server.rep, s.src)
snapshots, err := snapshot.ListSnapshots(ctx, s.server.rep, s.src)
if err != nil {
s.setStatus("FAILED")
return

View File

@@ -1,18 +1,18 @@
package policy
import (
"context"
"strings"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
)
// GetExpiredSnapshots computes the set of snapshot manifests that are not retained according to the policy.
func GetExpiredSnapshots(rep *repo.Repository, snapshots []*snapshot.Manifest) ([]*snapshot.Manifest, error) {
func GetExpiredSnapshots(ctx context.Context, rep *repo.Repository, snapshots []*snapshot.Manifest) ([]*snapshot.Manifest, error) {
var toDelete []*snapshot.Manifest
for _, snapshotGroup := range snapshot.GroupBySource(snapshots) {
td, err := getExpiredSnapshotsForSource(rep, snapshotGroup)
td, err := getExpiredSnapshotsForSource(ctx, rep, snapshotGroup)
if err != nil {
return nil, err
}
@@ -21,9 +21,9 @@ func GetExpiredSnapshots(rep *repo.Repository, snapshots []*snapshot.Manifest) (
return toDelete, nil
}
func getExpiredSnapshotsForSource(rep *repo.Repository, snapshots []*snapshot.Manifest) ([]*snapshot.Manifest, error) {
func getExpiredSnapshotsForSource(ctx context.Context, rep *repo.Repository, snapshots []*snapshot.Manifest) ([]*snapshot.Manifest, error) {
src := snapshots[0].Source
pol, _, err := GetEffectivePolicy(rep, src)
pol, _, err := GetEffectivePolicy(ctx, rep, src)
if err != nil {
return nil, err
}

View File

@@ -2,14 +2,15 @@
package policy
import (
"context"
"fmt"
"path/filepath"
"strings"
"github.com/kopia/kopia/fs/ignorefs"
"github.com/kopia/kopia/internal/kopialogging"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
)
@@ -21,12 +22,15 @@
// GetEffectivePolicy calculates effective snapshot policy for a given source by combining the source-specifc policy (if any)
// with parent policies. The source must contain a path.
// Returns the effective policies and all source policies that contributed to that (most specific first).
func GetEffectivePolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, []*Policy, error) {
func GetEffectivePolicy(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo) (*Policy, []*Policy, error) {
var md []*manifest.EntryMetadata
// Find policies applying to paths all the way up to the root.
for tmp := si; len(si.Path) > 0; {
manifests := rep.Manifests.Find(labelsForSource(tmp))
manifests, err := rep.Manifests.Find(ctx, labelsForSource(tmp))
if err != nil {
return nil, nil, err
}
md = append(md, manifests...)
parentPath := filepath.Dir(tmp.Path)
@@ -38,19 +42,33 @@ func GetEffectivePolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy,
}
// Try user@host policy
md = append(md, rep.Manifests.Find(labelsForSource(snapshot.SourceInfo{Host: si.Host, UserName: si.UserName}))...)
userHostManifests, err := rep.Manifests.Find(ctx, labelsForSource(snapshot.SourceInfo{Host: si.Host, UserName: si.UserName}))
if err != nil {
return nil, nil, err
}
md = append(md, userHostManifests...)
// Try host-level policy.
md = append(md, rep.Manifests.Find(labelsForSource(snapshot.SourceInfo{Host: si.Host}))...)
if err != nil {
return nil, nil, err
}
hostManifests, err := rep.Manifests.Find(ctx, labelsForSource(snapshot.SourceInfo{Host: si.Host}))
if err != nil {
return nil, nil, err
}
md = append(md, hostManifests...)
// Global policy.
globalManifests := rep.Manifests.Find(labelsForSource(GlobalPolicySourceInfo))
globalManifests, err := rep.Manifests.Find(ctx, labelsForSource(GlobalPolicySourceInfo))
if err != nil {
return nil, nil, err
}
md = append(md, globalManifests...)
var policies []*Policy
for _, em := range md {
p := &Policy{}
if err := rep.Manifests.Get(em.ID, &p); err != nil {
if err := rep.Manifests.Get(ctx, em.ID, &p); err != nil {
return nil, nil, fmt.Errorf("got unexpected error when loading policy item %v: %v", em.ID, err)
}
p.Labels = em.Labels
@@ -65,8 +83,11 @@ func GetEffectivePolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy,
}
// GetDefinedPolicy returns the policy defined on the provided snapshot.SourceInfo or ErrPolicyNotFound if not present.
func GetDefinedPolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, error) {
md := rep.Manifests.Find(labelsForSource(si))
func GetDefinedPolicy(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo) (*Policy, error) {
md, err := rep.Manifests.Find(ctx, labelsForSource(si))
if err != nil {
return nil, fmt.Errorf("unable to find policy for source: %v", err)
}
if len(md) == 0 {
return nil, ErrPolicyNotFound
@@ -75,7 +96,7 @@ func GetDefinedPolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, er
if len(md) == 1 {
p := &Policy{}
err := rep.Manifests.Get(md[0].ID, p)
err := rep.Manifests.Get(ctx, md[0].ID, p)
if err == manifest.ErrNotFound {
return nil, ErrPolicyNotFound
}
@@ -84,7 +105,7 @@ func GetDefinedPolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, er
return nil, err
}
em, err := rep.Manifests.GetMetadata(md[0].ID)
em, err := rep.Manifests.GetMetadata(ctx, md[0].ID)
if err != nil {
return nil, ErrPolicyNotFound
}
@@ -97,10 +118,13 @@ func GetDefinedPolicy(rep *repo.Repository, si snapshot.SourceInfo) (*Policy, er
}
// SetPolicy sets the policy on a given source.
func SetPolicy(rep *repo.Repository, si snapshot.SourceInfo, pol *Policy) error {
md := rep.Manifests.Find(labelsForSource(si))
func SetPolicy(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo, pol *Policy) error {
md, err := rep.Manifests.Find(ctx, labelsForSource(si))
if err != nil {
return fmt.Errorf("unable to load manifests for %v: %v", si, err)
}
if _, err := rep.Manifests.Put(labelsForSource(si), pol); err != nil {
if _, err := rep.Manifests.Put(ctx, labelsForSource(si), pol); err != nil {
return err
}
@@ -112,8 +136,12 @@ func SetPolicy(rep *repo.Repository, si snapshot.SourceInfo, pol *Policy) error
}
// RemovePolicy removes the policy for a given source.
func RemovePolicy(rep *repo.Repository, si snapshot.SourceInfo) error {
md := rep.Manifests.Find(labelsForSource(si))
func RemovePolicy(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo) error {
md, err := rep.Manifests.Find(ctx, labelsForSource(si))
if err != nil {
return fmt.Errorf("unable to load manifests for %v: %v", si, err)
}
for _, em := range md {
rep.Manifests.Delete(em.ID)
}
@@ -122,9 +150,9 @@ func RemovePolicy(rep *repo.Repository, si snapshot.SourceInfo) error {
}
// GetPolicyByID gets the policy for a given unique ID or ErrPolicyNotFound if not found.
func GetPolicyByID(rep *repo.Repository, id string) (*Policy, error) {
func GetPolicyByID(ctx context.Context, rep *repo.Repository, id string) (*Policy, error) {
p := &Policy{}
if err := rep.Manifests.Get(id, &p); err != nil {
if err := rep.Manifests.Get(ctx, id, &p); err != nil {
if err == manifest.ErrNotFound {
return nil, ErrPolicyNotFound
}
@@ -134,21 +162,24 @@ func GetPolicyByID(rep *repo.Repository, id string) (*Policy, error) {
}
// ListPolicies returns a list of all policies.
func ListPolicies(rep *repo.Repository) ([]*Policy, error) {
ids := rep.Manifests.Find(map[string]string{
func ListPolicies(ctx context.Context, rep *repo.Repository) ([]*Policy, error) {
ids, err := rep.Manifests.Find(ctx, map[string]string{
"type": "policy",
})
if err != nil {
return nil, fmt.Errorf("unable to list manifests: %v", err)
}
var policies []*Policy
for _, id := range ids {
pol := &Policy{}
err := rep.Manifests.Get(id.ID, pol)
err := rep.Manifests.Get(ctx, id.ID, pol)
if err != nil {
return nil, err
}
md, err := rep.Manifests.GetMetadata(id.ID)
md, err := rep.Manifests.GetMetadata(ctx, id.ID)
if err != nil {
return nil, err
}
@@ -162,10 +193,10 @@ func ListPolicies(rep *repo.Repository) ([]*Policy, error) {
}
// FilesPolicyGetter returns ignorefs.FilesPolicyGetter for a given source.
func FilesPolicyGetter(rep *repo.Repository, si snapshot.SourceInfo) (ignorefs.FilesPolicyGetter, error) {
func FilesPolicyGetter(ctx context.Context, rep *repo.Repository, si snapshot.SourceInfo) (ignorefs.FilesPolicyGetter, error) {
result := ignorefs.FilesPolicyMap{}
pol, _, err := GetEffectivePolicy(rep, si)
pol, _, err := GetEffectivePolicy(ctx, rep, si)
if err != nil {
return nil, err
}
@@ -173,17 +204,20 @@ func FilesPolicyGetter(rep *repo.Repository, si snapshot.SourceInfo) (ignorefs.F
result["."] = &pol.FilesPolicy
// Find all policies for this host and user
policies := rep.Manifests.Find(map[string]string{
policies, err := rep.Manifests.Find(ctx, map[string]string{
"type": "policy",
"policyType": "path",
"username": si.UserName,
"hostname": si.Host,
})
if err != nil {
return nil, fmt.Errorf("unable to find manifests for %v@%v: %v", si.UserName, si.Host, err)
}
log.Debugf("found %v policies for %v@%v", si.UserName, si.Host)
for _, id := range policies {
em, err := rep.Manifests.GetMetadata(id.ID)
em, err := rep.Manifests.GetMetadata(ctx, id.ID)
if err != nil {
return nil, err
}
@@ -201,7 +235,7 @@ func FilesPolicyGetter(rep *repo.Repository, si snapshot.SourceInfo) (ignorefs.F
rel = "./" + rel
log.Debugf("loading policy for %v (%v)", policyPath, rel)
pol := &Policy{}
if err := rep.Manifests.Get(id.ID, pol); err != nil {
if err := rep.Manifests.Get(ctx, id.ID, pol); err != nil {
return nil, fmt.Errorf("unable to load policy %v: %v", id.ID, err)
}
result[rel] = &pol.FilesPolicy

View File

@@ -32,6 +32,7 @@ type Manager struct {
mu sync.Mutex
b *block.Manager
initialized bool
pendingEntries map[string]*manifestEntry
committedEntries map[string]*manifestEntry
@@ -39,10 +40,14 @@ type Manager struct {
}
// Put serializes the provided payload to JSON and persists it. Returns unique handle that represents the object.
func (m *Manager) Put(labels map[string]string, payload interface{}) (string, error) {
func (m *Manager) Put(ctx context.Context, labels map[string]string, payload interface{}) (string, error) {
if labels["type"] == "" {
return "", fmt.Errorf("'type' label is required")
}
if err := m.ensureInitialized(ctx); err != nil {
return "", err
}
m.mu.Lock()
defer m.mu.Unlock()
@@ -69,7 +74,11 @@ func (m *Manager) Put(labels map[string]string, payload interface{}) (string, er
}
// GetMetadata returns metadata about provided manifest item or ErrNotFound if the item can't be found.
func (m *Manager) GetMetadata(id string) (*EntryMetadata, error) {
func (m *Manager) GetMetadata(ctx context.Context, id string) (*EntryMetadata, error) {
if err := m.ensureInitialized(ctx); err != nil {
return nil, err
}
m.mu.Lock()
defer m.mu.Unlock()
@@ -92,8 +101,12 @@ func (m *Manager) GetMetadata(id string) (*EntryMetadata, error) {
// Get retrieves the contents of the provided manifest item by deserializing it as JSON to provided object.
// If the manifest is not found, returns ErrNotFound.
func (m *Manager) Get(id string, data interface{}) error {
b, err := m.GetRaw(id)
func (m *Manager) Get(ctx context.Context, id string, data interface{}) error {
if err := m.ensureInitialized(ctx); err != nil {
return err
}
b, err := m.GetRaw(ctx, id)
if err != nil {
return err
}
@@ -106,7 +119,11 @@ func (m *Manager) Get(id string, data interface{}) error {
}
// GetRaw returns raw contents of the provided manifest (JSON bytes) or ErrNotFound if not found.
func (m *Manager) GetRaw(id string) ([]byte, error) {
func (m *Manager) GetRaw(ctx context.Context, id string) ([]byte, error) {
if err := m.ensureInitialized(ctx); err != nil {
return nil, err
}
m.mu.Lock()
defer m.mu.Unlock()
@@ -122,7 +139,11 @@ func (m *Manager) GetRaw(id string) ([]byte, error) {
}
// Find returns the list of EntryMetadata for manifest entries matching all provided labels.
func (m *Manager) Find(labels map[string]string) []*EntryMetadata {
func (m *Manager) Find(ctx context.Context, labels map[string]string) ([]*EntryMetadata, error) {
if err := m.ensureInitialized(ctx); err != nil {
return nil, err
}
m.mu.Lock()
defer m.mu.Unlock()
@@ -146,7 +167,7 @@ func (m *Manager) Find(labels map[string]string) []*EntryMetadata {
sort.Slice(matches, func(i, j int) bool {
return matches[i].ModTime.Before(matches[j].ModTime)
})
return matches
return matches, nil
}
func cloneEntryMetadata(e *manifestEntry) *EntryMetadata {
@@ -232,14 +253,14 @@ func (m *Manager) Delete(id string) {
// Refresh updates the committed blocks from the underlying storage.
func (m *Manager) Refresh(ctx context.Context) error {
return m.loadCommittedBlocks(ctx)
}
func (m *Manager) loadCommittedBlocks(ctx context.Context) error {
log.Debugf("listing manifest blocks")
m.mu.Lock()
defer m.mu.Unlock()
return m.loadCommittedBlocksLocked(ctx)
}
func (m *Manager) loadCommittedBlocksLocked(ctx context.Context) error {
log.Debugf("listing manifest blocks")
for {
blocks, err := m.b.ListBlocks(manifestBlockPrefix)
if err != nil {
@@ -451,6 +472,22 @@ func (m *Manager) mergeEntry(e *manifestEntry) {
}
}
func (m *Manager) ensureInitialized(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.initialized {
return nil
}
if err := m.loadCommittedBlocksLocked(ctx); err != nil {
return err
}
m.initialized = true
return nil
}
func copyLabels(m map[string]string) map[string]string {
r := map[string]string{}
for k, v := range m {
@@ -468,9 +505,5 @@ func NewManager(ctx context.Context, b *block.Manager) (*Manager, error) {
committedBlockIDs: map[string]bool{},
}
if err := m.loadCommittedBlocks(ctx); err != nil {
return nil, err
}
return m, nil
}

View File

@@ -8,8 +8,8 @@
"testing"
"time"
"github.com/kopia/kopia/repo/internal/storagetesting"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/kopia/repo/internal/storagetesting"
)
func TestManifest(t *testing.T) {
@@ -28,9 +28,9 @@ func TestManifest(t *testing.T) {
labels2 := map[string]string{"type": "item", "color": "blue", "shape": "square"}
labels3 := map[string]string{"type": "item", "shape": "square", "color": "red"}
id1 := addAndVerify(t, mgr, labels1, item1)
id2 := addAndVerify(t, mgr, labels2, item2)
id3 := addAndVerify(t, mgr, labels3, item3)
id1 := addAndVerify(ctx, t, mgr, labels1, item1)
id2 := addAndVerify(ctx, t, mgr, labels2, item2)
id3 := addAndVerify(ctx, t, mgr, labels3, item3)
cases := []struct {
criteria map[string]string
@@ -46,11 +46,11 @@ func TestManifest(t *testing.T) {
// verify before flush
for _, tc := range cases {
verifyMatches(t, mgr, tc.criteria, tc.expected)
verifyMatches(ctx, t, mgr, tc.criteria, tc.expected)
}
verifyItem(t, mgr, id1, labels1, item1)
verifyItem(t, mgr, id2, labels2, item2)
verifyItem(t, mgr, id3, labels3, item3)
verifyItem(ctx, t, mgr, id1, labels1, item1)
verifyItem(ctx, t, mgr, id2, labels2, item2)
verifyItem(ctx, t, mgr, id3, labels3, item3)
if err := mgr.Flush(ctx); err != nil {
t.Errorf("flush error: %v", err)
@@ -61,11 +61,11 @@ func TestManifest(t *testing.T) {
// verify after flush
for _, tc := range cases {
verifyMatches(t, mgr, tc.criteria, tc.expected)
verifyMatches(ctx, t, mgr, tc.criteria, tc.expected)
}
verifyItem(t, mgr, id1, labels1, item1)
verifyItem(t, mgr, id2, labels2, item2)
verifyItem(t, mgr, id3, labels3, item3)
verifyItem(ctx, t, mgr, id1, labels1, item1)
verifyItem(ctx, t, mgr, id2, labels2, item2)
verifyItem(ctx, t, mgr, id3, labels3, item3)
// flush underlying block manager and verify in new manifest manager.
mgr.b.Flush(ctx)
@@ -74,11 +74,11 @@ func TestManifest(t *testing.T) {
t.Fatalf("can't open block manager: %v", setupErr)
}
for _, tc := range cases {
verifyMatches(t, mgr2, tc.criteria, tc.expected)
verifyMatches(ctx, t, mgr2, tc.criteria, tc.expected)
}
verifyItem(t, mgr2, id1, labels1, item1)
verifyItem(t, mgr2, id2, labels2, item2)
verifyItem(t, mgr2, id3, labels3, item3)
verifyItem(ctx, t, mgr2, id1, labels1, item1)
verifyItem(ctx, t, mgr2, id2, labels2, item2)
verifyItem(ctx, t, mgr2, id3, labels3, item3)
if err := mgr2.Flush(ctx); err != nil {
t.Errorf("flush error: %v", err)
}
@@ -86,13 +86,13 @@ func TestManifest(t *testing.T) {
// delete from one
time.Sleep(1 * time.Second)
mgr.Delete(id3)
verifyItemNotFound(t, mgr, id3)
verifyItemNotFound(ctx, t, mgr, id3)
mgr.Flush(ctx)
verifyItemNotFound(t, mgr, id3)
verifyItemNotFound(ctx, t, mgr, id3)
// still found in another
verifyItem(t, mgr2, id3, labels3, item3)
if err := mgr2.loadCommittedBlocks(ctx); err != nil {
verifyItem(ctx, t, mgr2, id3, labels3, item3)
if err := mgr2.loadCommittedBlocksLocked(ctx); err != nil {
t.Errorf("unable to load: %v", err)
}
@@ -115,27 +115,27 @@ func TestManifest(t *testing.T) {
t.Fatalf("can't open manager: %v", err)
}
verifyItem(t, mgr3, id1, labels1, item1)
verifyItem(t, mgr3, id2, labels2, item2)
verifyItemNotFound(t, mgr3, id3)
verifyItem(ctx, t, mgr3, id1, labels1, item1)
verifyItem(ctx, t, mgr3, id2, labels2, item2)
verifyItemNotFound(ctx, t, mgr3, id3)
}
func addAndVerify(t *testing.T, mgr *Manager, labels map[string]string, data map[string]int) string {
func addAndVerify(ctx context.Context, t *testing.T, mgr *Manager, labels map[string]string, data map[string]int) string {
t.Helper()
id, err := mgr.Put(labels, data)
id, err := mgr.Put(ctx, labels, data)
if err != nil {
t.Errorf("unable to add %v (%v): %v", labels, data, err)
return ""
}
verifyItem(t, mgr, id, labels, data)
verifyItem(ctx, t, mgr, id, labels, data)
return id
}
func verifyItem(t *testing.T, mgr *Manager, id string, labels map[string]string, data map[string]int) {
func verifyItem(ctx context.Context, t *testing.T, mgr *Manager, id string, labels map[string]string, data map[string]int) {
t.Helper()
l, err := mgr.GetMetadata(id)
l, err := mgr.GetMetadata(ctx, id)
if err != nil {
t.Errorf("unable to retrieve %q: %v", id, err)
return
@@ -146,21 +146,26 @@ func verifyItem(t *testing.T, mgr *Manager, id string, labels map[string]string,
}
}
func verifyItemNotFound(t *testing.T, mgr *Manager, id string) {
func verifyItemNotFound(ctx context.Context, t *testing.T, mgr *Manager, id string) {
t.Helper()
_, err := mgr.GetMetadata(id)
_, err := mgr.GetMetadata(ctx, id)
if got, want := err, ErrNotFound; got != want {
t.Errorf("invalid error when getting %q %v, expected %v", id, err, ErrNotFound)
return
}
}
func verifyMatches(t *testing.T, mgr *Manager, labels map[string]string, expected []string) {
func verifyMatches(ctx context.Context, t *testing.T, mgr *Manager, labels map[string]string, expected []string) {
t.Helper()
var matches []string
for _, m := range mgr.Find(labels) {
items, err := mgr.Find(ctx, labels)
if err != nil {
t.Errorf("error in Find(): %v", err)
return
}
for _, m := range items {
matches = append(matches, m.ID)
}
sort.Strings(matches)

View File

@@ -280,12 +280,15 @@ func refresh(ctx context.Context, t *testing.T, r *repo.Repository) error {
}
func readRandomManifest(ctx context.Context, t *testing.T, r *repo.Repository) error {
manifests := r.Manifests.Find(nil)
manifests, err := r.Manifests.Find(ctx, nil)
if err != nil {
return err
}
if len(manifests) == 0 {
return nil
}
n := rand.Intn(len(manifests))
_, err := r.Manifests.GetRaw(manifests[n].ID)
_, err = r.Manifests.GetRaw(ctx, manifests[n].ID)
return err
}
@@ -298,7 +301,7 @@ func writeRandomManifest(ctx context.Context, t *testing.T, r *repo.Repository)
content2 := fmt.Sprintf("content-%v", rand.Intn(10))
content1val := fmt.Sprintf("val1-%v", rand.Intn(10))
content2val := fmt.Sprintf("val2-%v", rand.Intn(10))
_, err := r.Manifests.Put(map[string]string{
_, err := r.Manifests.Put(ctx, map[string]string{
"type": key1,
key1: val1,
key2: val2,

View File

@@ -2,6 +2,9 @@
package snapshot
import (
"context"
"fmt"
"github.com/kopia/kopia/internal/kopialogging"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
@@ -10,10 +13,13 @@
var log = kopialogging.Logger("kopia/snapshot")
// ListSources lists all snapshot sources in a given repository.
func ListSources(rep *repo.Repository) []SourceInfo {
items := rep.Manifests.Find(map[string]string{
func ListSources(ctx context.Context, rep *repo.Repository) ([]SourceInfo, error) {
items, err := rep.Manifests.Find(ctx, map[string]string{
"type": "snapshot",
})
if err != nil {
return nil, fmt.Errorf("unable to find manifest entries: %v", err)
}
uniq := map[SourceInfo]bool{}
for _, it := range items {
@@ -25,7 +31,7 @@ func ListSources(rep *repo.Repository) []SourceInfo {
infos = append(infos, k)
}
return infos
return infos, nil
}
func sourceInfoFromLabels(labels map[string]string) SourceInfo {
@@ -42,15 +48,19 @@ func sourceInfoToLabels(si SourceInfo) map[string]string {
}
// ListSnapshots lists all snapshots for a given source.
func ListSnapshots(rep *repo.Repository, si SourceInfo) ([]*Manifest, error) {
return LoadSnapshots(rep, manifest.EntryIDs(rep.Manifests.Find(sourceInfoToLabels(si))))
func ListSnapshots(ctx context.Context, rep *repo.Repository, si SourceInfo) ([]*Manifest, error) {
entries, err := rep.Manifests.Find(ctx, sourceInfoToLabels(si))
if err != nil {
return nil, fmt.Errorf("unable to find manifest entries: %v", err)
}
return LoadSnapshots(ctx, rep, manifest.EntryIDs(entries))
}
// LoadSnapshot loads and parses a snapshot with a given ID.
func LoadSnapshot(rep *repo.Repository, manifestID string) (*Manifest, error) {
func LoadSnapshot(ctx context.Context, rep *repo.Repository, manifestID string) (*Manifest, error) {
sm := &Manifest{}
if err := rep.Manifests.Get(manifestID, sm); err != nil {
return nil, err
if err := rep.Manifests.Get(ctx, manifestID, sm); err != nil {
return nil, fmt.Errorf("unable to find manifest entries: %v", err)
}
sm.ID = manifestID
@@ -59,12 +69,12 @@ func LoadSnapshot(rep *repo.Repository, manifestID string) (*Manifest, error) {
}
// SaveSnapshot persists given snapshot manifest and returns manifest ID.
func SaveSnapshot(rep *repo.Repository, manifest *Manifest) (string, error) {
return rep.Manifests.Put(sourceInfoToLabels(manifest.Source), manifest)
func SaveSnapshot(ctx context.Context, rep *repo.Repository, manifest *Manifest) (string, error) {
return rep.Manifests.Put(ctx, sourceInfoToLabels(manifest.Source), manifest)
}
// LoadSnapshots efficiently loads and parses a given list of snapshot IDs.
func LoadSnapshots(rep *repo.Repository, names []string) ([]*Manifest, error) {
func LoadSnapshots(ctx context.Context, rep *repo.Repository, names []string) ([]*Manifest, error) {
result := make([]*Manifest, len(names))
sem := make(chan bool, 50)
@@ -73,7 +83,7 @@ func LoadSnapshots(rep *repo.Repository, names []string) ([]*Manifest, error) {
go func(i int, n string) {
defer func() { <-sem }()
m, err := LoadSnapshot(rep, n)
m, err := LoadSnapshot(ctx, rep, n)
if err != nil {
log.Warningf("unable to parse snapshot manifest %v: %v", n, err)
return
@@ -98,7 +108,7 @@ func LoadSnapshots(rep *repo.Repository, names []string) ([]*Manifest, error) {
}
// ListSnapshotManifests returns the list of snapshot manifests for a given source or all sources if nil.
func ListSnapshotManifests(rep *repo.Repository, src *SourceInfo) []string {
func ListSnapshotManifests(ctx context.Context, rep *repo.Repository, src *SourceInfo) ([]string, error) {
labels := map[string]string{
"type": "snapshot",
}
@@ -107,5 +117,10 @@ func ListSnapshotManifests(rep *repo.Repository, src *SourceInfo) []string {
labels = sourceInfoToLabels(*src)
}
return manifest.EntryIDs(rep.Manifests.Find(labels))
entries, err := rep.Manifests.Find(ctx, labels)
if err != nil {
return nil, fmt.Errorf("unable to find manifest entries: %v", err)
}
return manifest.EntryIDs(entries), nil
}