From ab2c906f2c76eed83973ab6d821648dcbe977a96 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Fri, 21 Feb 2020 22:54:02 -0800 Subject: [PATCH] server: implemented remaining server API methods CreateSnapshotSource API for ensuring source exists Upload - starts upload on a given source or matching sources Cancel - cancels upload on a given source or matching sources --- internal/server/api_policies.go | 74 ++++++------ internal/server/api_repo.go | 8 +- ...{api_snapshot_list.go => api_snapshots.go} | 30 +---- internal/server/api_sources.go | 111 ++++++++++++++++++ internal/server/api_sources_list.go | 29 ----- internal/server/server.go | 77 +++++++----- internal/server/source_manager.go | 52 +++++--- internal/serverapi/client_wrappers.go | 77 +++++++++++- internal/serverapi/serverapi.go | 51 ++++++-- tests/end_to_end_test/server_start_test.go | 90 +++++++++++++- 10 files changed, 434 insertions(+), 165 deletions(-) rename internal/server/{api_snapshot_list.go => api_snapshots.go} (60%) create mode 100644 internal/server/api_sources.go delete mode 100644 internal/server/api_sources_list.go diff --git a/internal/server/api_policies.go b/internal/server/api_policies.go index 2cdc11f1d..c15128bae 100644 --- a/internal/server/api_policies.go +++ b/internal/server/api_policies.go @@ -4,6 +4,7 @@ "context" "encoding/json" "net/http" + "net/url" "github.com/kopia/kopia/internal/serverapi" "github.com/kopia/kopia/snapshot" @@ -36,45 +37,44 @@ func (s *Server) handlePolicyList(ctx context.Context, r *http.Request) (interfa return resp, nil } -func (s *Server) handlePolicyCRUD(ctx context.Context, r *http.Request) (interface{}, *apiError) { - host := r.URL.Query().Get("host") - path := r.URL.Query().Get("path") - username := r.URL.Query().Get("userName") - target := snapshot.SourceInfo{ +func getPolicyTargetFromURL(u *url.URL) snapshot.SourceInfo { + host := u.Query().Get("host") + path := u.Query().Get("path") + username := u.Query().Get("userName") + + return snapshot.SourceInfo{ Host: host, Path: path, UserName: username, } - - switch r.Method { - case "GET": - pol, err := policy.GetDefinedPolicy(ctx, s.rep, target) - if err == policy.ErrPolicyNotFound { - return nil, requestError(serverapi.ErrorNotFound, "policy not found") - } - - return pol, nil - - case "DELETE": - if err := policy.RemovePolicy(ctx, s.rep, target); err != nil { - return nil, internalServerError(err) - } - - return &serverapi.Empty{}, nil - - case "PUT": - newPolicy := &policy.Policy{} - if err := json.NewDecoder(r.Body).Decode(newPolicy); err != nil { - return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body") - } - - if err := policy.SetPolicy(ctx, s.rep, target, newPolicy); err != nil { - return nil, internalServerError(err) - } - - return &serverapi.Empty{}, nil - - default: - return nil, requestError(serverapi.ErrorMalformedRequest, "incompatible HTTP method") - } +} + +func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request) (interface{}, *apiError) { + pol, err := policy.GetDefinedPolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL)) + if err == policy.ErrPolicyNotFound { + return nil, requestError(serverapi.ErrorNotFound, "policy not found") + } + + return pol, nil +} + +func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request) (interface{}, *apiError) { + if err := policy.RemovePolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL)); err != nil { + return nil, internalServerError(err) + } + + return &serverapi.Empty{}, nil +} + +func (s *Server) handlePolicyPut(ctx context.Context, r *http.Request) (interface{}, *apiError) { + newPolicy := &policy.Policy{} + if err := json.NewDecoder(r.Body).Decode(newPolicy); err != nil { + return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body") + } + + if err := policy.SetPolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL), newPolicy); err != nil { + return nil, internalServerError(err) + } + + return &serverapi.Empty{}, nil } diff --git a/internal/server/api_repo.go b/internal/server/api_repo.go index 99f2772c7..bff2e2a0e 100644 --- a/internal/server/api_repo.go +++ b/internal/server/api_repo.go @@ -36,7 +36,7 @@ func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interfa }, nil } -func maybeDecodeToken(req *serverapi.ConnectRequest) *apiError { +func maybeDecodeToken(req *serverapi.ConnectRepositoryRequest) *apiError { if req.Token != "" { ci, password, err := repo.DecodeToken(req.Token) if err != nil { @@ -57,13 +57,13 @@ func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interfa return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected") } - var req serverapi.CreateRequest + var req serverapi.CreateRepositoryRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error()) } - if err := maybeDecodeToken(&req.ConnectRequest); err != nil { + if err := maybeDecodeToken(&req.ConnectRepositoryRequest); err != nil { return nil, err } @@ -97,7 +97,7 @@ func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interf return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected") } - var req serverapi.ConnectRequest + var req serverapi.ConnectRepositoryRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error()) diff --git a/internal/server/api_snapshot_list.go b/internal/server/api_snapshots.go similarity index 60% rename from internal/server/api_snapshot_list.go rename to internal/server/api_snapshots.go index 96859d493..36cb30fd7 100644 --- a/internal/server/api_snapshot_list.go +++ b/internal/server/api_snapshots.go @@ -4,31 +4,13 @@ "context" "net/http" "net/url" - "time" - "github.com/kopia/kopia/fs" - "github.com/kopia/kopia/repo/manifest" + "github.com/kopia/kopia/internal/serverapi" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" ) -type snapshotListEntry struct { - ID manifest.ID `json:"id"` - Source snapshot.SourceInfo `json:"source"` - Description string `json:"description"` - StartTime time.Time `json:"startTime"` - EndTime time.Time `json:"endTime"` - IncompleteReason string `json:"incomplete,omitempty"` - Summary *fs.DirectorySummary `json:"summary"` - RootEntry string `json:"rootID"` - RetentionReasons []string `json:"retention"` -} - -type snapshotListResponse struct { - Snapshots []*snapshotListEntry `json:"snapshots"` -} - -func (s *Server) handleSourceSnapshotList(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleSnapshotList(ctx context.Context, r *http.Request) (interface{}, *apiError) { manifestIDs, err := snapshot.ListSnapshotManifests(ctx, s.rep, nil) if err != nil { return nil, internalServerError(err) @@ -39,8 +21,8 @@ func (s *Server) handleSourceSnapshotList(ctx context.Context, r *http.Request) return nil, internalServerError(err) } - resp := &snapshotListResponse{ - Snapshots: []*snapshotListEntry{}, + resp := &serverapi.SnapshotsResponse{ + Snapshots: []*serverapi.Snapshot{}, } groups := snapshot.GroupBySource(manifests) @@ -79,8 +61,8 @@ func sourceMatchesURLFilter(src snapshot.SourceInfo, query url.Values) bool { return true } -func convertSnapshotManifest(m *snapshot.Manifest) *snapshotListEntry { - e := &snapshotListEntry{ +func convertSnapshotManifest(m *snapshot.Manifest) *serverapi.Snapshot { + e := &serverapi.Snapshot{ ID: m.ID, Source: m.Source, Description: m.Description, diff --git a/internal/server/api_sources.go b/internal/server/api_sources.go new file mode 100644 index 000000000..63dbfd3af --- /dev/null +++ b/internal/server/api_sources.go @@ -0,0 +1,111 @@ +package server + +import ( + "context" + "encoding/json" + "net/http" + "os" + "sort" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/serverapi" + "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/policy" +) + +func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interface{}, *apiError) { + resp := &serverapi.SourcesResponse{ + Sources: []*serverapi.SourceStatus{}, + } + + for _, v := range s.sourceManagers { + if !sourceMatchesURLFilter(v.src, r.URL.Query()) { + continue + } + + resp.Sources = append(resp.Sources, v.Status()) + } + + sort.Slice(resp.Sources, func(i, j int) bool { + return resp.Sources[i].Source.String() < resp.Sources[j].Source.String() + }) + + return resp, nil +} + +func (s *Server) handleSourcesCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) { + var req serverapi.CreateSnapshotSourceRequest + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body") + } + + if req.Path == "" { + return nil, requestError(serverapi.ErrorMalformedRequest, "missing path") + } + + _, err := os.Stat(req.Path) + if os.IsNotExist(err) { + return nil, requestError(serverapi.ErrorPathNotFound, "path does not exist") + } + + if err != nil { + return nil, internalServerError(err) + } + + sourceInfo := snapshot.SourceInfo{ + UserName: s.options.Username, + Host: s.options.Hostname, + Path: req.Path, + } + + // ensure we have the policy for this source, otherwise it will not show up in the + // list of sources at all. + _, err = policy.GetDefinedPolicy(ctx, s.rep, sourceInfo) + switch err { + case nil: + log.Debugf("policy for %v already exists", sourceInfo) + // have policy, do nothing + + case policy.ErrPolicyNotFound: + // don't have policy - create an empty one + log.Debugf("policy for %v not found, creating empty one", sourceInfo) + + if err = policy.SetPolicy(ctx, s.rep, sourceInfo, &policy.Policy{}); err != nil { + return nil, internalServerError(errors.Wrap(err, "unable to set policy")) + } + + if err = s.rep.Flush(ctx); err != nil { + return nil, internalServerError(errors.Wrap(err, "unable to flush")) + } + + default: + return nil, internalServerError(err) + } + + // upgrade to exclusive lock to ensure we have source manager + s.mu.RUnlock() + s.mu.Lock() + if s.sourceManagers[sourceInfo] == nil { + log.Debugf("creating source manager for %v", sourceInfo) + sm := newSourceManager(sourceInfo, s) + s.sourceManagers[sourceInfo] = sm + + go sm.run(context.Background()) + } + s.mu.Unlock() + s.mu.RLock() + + manager := s.sourceManagers[sourceInfo] + if manager == nil { + return nil, internalServerError(errors.Errorf("could not find source manager that was just created")) + } + + if req.CreateSnapshot { + log.Debugf("scheduling snapshot of %v immediately...", sourceInfo) + manager.scheduleSnapshotNow() + } + + return &serverapi.Empty{}, nil +} diff --git a/internal/server/api_sources_list.go b/internal/server/api_sources_list.go deleted file mode 100644 index c115460f6..000000000 --- a/internal/server/api_sources_list.go +++ /dev/null @@ -1,29 +0,0 @@ -package server - -import ( - "context" - "net/http" - "sort" - - "github.com/kopia/kopia/internal/serverapi" -) - -func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interface{}, *apiError) { - resp := &serverapi.SourcesResponse{ - Sources: []*serverapi.SourceStatus{}, - } - - for _, v := range s.sourceManagers { - if !sourceMatchesURLFilter(v.src, r.URL.Query()) { - continue - } - - resp.Sources = append(resp.Sources, v.Status()) - } - - sort.Slice(resp.Sources, func(i, j int) bool { - return resp.Sources[i].Source.String() < resp.Sources[j].Source.String() - }) - - return resp, nil -} diff --git a/internal/server/server.go b/internal/server/server.go index 65f77de90..646b510ab 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -16,6 +16,7 @@ "github.com/kopia/kopia/internal/serverapi" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/policy" ) var log = kopialogging.Logger("kopia/server") @@ -37,32 +38,37 @@ type Server struct { // APIHandlers handles API requests. func (s *Server) APIHandlers() http.Handler { - mux := mux.NewRouter() + m := mux.NewRouter() - mux.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesList)).Methods("GET") - mux.HandleFunc("/api/v1/snapshots", s.handleAPI(s.handleSourceSnapshotList)).Methods("GET") - mux.HandleFunc("/api/v1/policies", s.handleAPI(s.handlePolicyList)).Methods("GET") - mux.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyCRUD)).Methods("GET", "PUT", "DELETE") + // sources + m.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesList)).Methods("GET") + m.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesCreate)).Methods("POST") + m.HandleFunc("/api/v1/sources/upload", s.handleAPI(s.handleUpload)).Methods("POST") + m.HandleFunc("/api/v1/sources/cancel", s.handleAPI(s.handleCancel)).Methods("POST") - mux.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh)).Methods("POST") - mux.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush)).Methods("POST") - mux.HandleFunc("/api/v1/shutdown", s.handleAPIPossiblyNotConnected(s.handleShutdown)).Methods("POST") + // snapshots + m.HandleFunc("/api/v1/snapshots", s.handleAPI(s.handleSnapshotList)).Methods("GET") - mux.HandleFunc("/api/v1/sources/pause", s.handleAPI(s.handlePause)).Methods("POST") - mux.HandleFunc("/api/v1/sources/resume", s.handleAPI(s.handleResume)).Methods("POST") - mux.HandleFunc("/api/v1/sources/upload", s.handleAPI(s.handleUpload)).Methods("POST") - mux.HandleFunc("/api/v1/sources/cancel", s.handleAPI(s.handleCancel)).Methods("POST") + m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyGet)).Methods("GET") + m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyPut)).Methods("PUT") + m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyDelete)).Methods("DELETE") - mux.HandleFunc("/api/v1/objects/", s.handleObjectGet) + m.HandleFunc("/api/v1/policies", s.handleAPI(s.handlePolicyList)).Methods("GET") - mux.HandleFunc("/api/v1/repo/status", s.handleAPIPossiblyNotConnected(s.handleRepoStatus)).Methods("GET") - mux.HandleFunc("/api/v1/repo/connect", s.handleAPIPossiblyNotConnected(s.handleRepoConnect)).Methods("POST") - mux.HandleFunc("/api/v1/repo/create", s.handleAPIPossiblyNotConnected(s.handleRepoCreate)).Methods("POST") - mux.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect)).Methods("POST") - mux.HandleFunc("/api/v1/repo/algorithms", s.handleAPIPossiblyNotConnected(s.handleRepoSupportedAlgorithms)).Methods("GET") - mux.HandleFunc("/api/v1/repo/sync", s.handleAPI(s.handleRepoSync)).Methods("POST") + m.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh)).Methods("POST") + m.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush)).Methods("POST") + m.HandleFunc("/api/v1/shutdown", s.handleAPIPossiblyNotConnected(s.handleShutdown)).Methods("POST") - return mux + m.HandleFunc("/api/v1/objects/", s.handleObjectGet).Methods("GET") + + m.HandleFunc("/api/v1/repo/status", s.handleAPIPossiblyNotConnected(s.handleRepoStatus)).Methods("GET") + m.HandleFunc("/api/v1/repo/connect", s.handleAPIPossiblyNotConnected(s.handleRepoConnect)).Methods("POST") + m.HandleFunc("/api/v1/repo/create", s.handleAPIPossiblyNotConnected(s.handleRepoCreate)).Methods("POST") + m.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect)).Methods("POST") + m.HandleFunc("/api/v1/repo/algorithms", s.handleAPIPossiblyNotConnected(s.handleRepoSupportedAlgorithms)).Methods("GET") + m.HandleFunc("/api/v1/repo/sync", s.handleAPI(s.handleRepoSync)).Methods("POST") + + return m } func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.HandlerFunc { @@ -152,14 +158,6 @@ func (s *Server) handleUpload(ctx context.Context, r *http.Request) (interface{} return s.forAllSourceManagersMatchingURLFilter((*sourceManager).upload, r.URL.Query()) } -func (s *Server) handlePause(ctx context.Context, r *http.Request) (interface{}, *apiError) { - return s.forAllSourceManagersMatchingURLFilter((*sourceManager).pause, r.URL.Query()) -} - -func (s *Server) handleResume(ctx context.Context, r *http.Request) (interface{}, *apiError) { - return s.forAllSourceManagersMatchingURLFilter((*sourceManager).resume, r.URL.Query()) -} - func (s *Server) handleCancel(ctx context.Context, r *http.Request) (interface{}, *apiError) { return s.forAllSourceManagersMatchingURLFilter((*sourceManager).cancel, r.URL.Query()) } @@ -234,7 +232,7 @@ func (s *Server) refreshPeriodically(ctx context.Context, r *repo.Repository) { log.Warningf("error refreshing repository: %v", err) } - if err := s.syncSourcesLocked(ctx); err != nil { + if err := s.SyncSources(ctx); err != nil { log.Warningf("unable to sync sources: %v", err) } } @@ -270,11 +268,28 @@ func (s *Server) stopAllSourceManagersLocked() { } func (s *Server) syncSourcesLocked(ctx context.Context) error { - sources, err := snapshot.ListSources(ctx, s.rep) + sources := map[snapshot.SourceInfo]bool{} + + snapshotSources, err := snapshot.ListSources(ctx, s.rep) if err != nil { return errors.Wrap(err, "unable to list sources") } + policies, err := policy.ListPolicies(ctx, s.rep) + if err != nil { + return errors.Wrap(err, "unable to list sources") + } + + for _, ss := range snapshotSources { + sources[ss] = true + } + + for _, pol := range policies { + if pol.Target().Path != "" && pol.Target().Host != "" && pol.Target().UserName != "" { + sources[pol.Target()] = true + } + } + // copy existing sources to a map, from which we will remove sources that are found // in the repository oldSourceManagers := map[snapshot.SourceInfo]*sourceManager{} @@ -282,7 +297,7 @@ func (s *Server) syncSourcesLocked(ctx context.Context) error { oldSourceManagers[k] = v } - for _, src := range sources { + for src := range sources { if _, ok := oldSourceManagers[src]; ok { // pre-existing source, already has a manager delete(oldSourceManagers, src) diff --git a/internal/server/source_manager.go b/internal/server/source_manager.go index 601b8354a..2e1dc9c10 100644 --- a/internal/server/source_manager.go +++ b/internal/server/source_manager.go @@ -28,10 +28,11 @@ type sourceManager struct { snapshotfs.NullUploadProgress - server *Server - src snapshot.SourceInfo - closed chan struct{} - wg sync.WaitGroup + server *Server + src snapshot.SourceInfo + closed chan struct{} + snapshotRequests chan struct{} + wg sync.WaitGroup mu sync.RWMutex uploader *snapshotfs.Uploader @@ -51,11 +52,14 @@ func (s *sourceManager) Status() *serverapi.SourceStatus { st := &serverapi.SourceStatus{ Source: s.src, Status: s.state, - LastSnapshotTime: s.lastSnapshot.StartTime, NextSnapshotTime: s.nextSnapshotTime, SchedulingPolicy: s.pol, } + if ls := s.lastSnapshot; ls != nil { + st.LastSnapshotTime = &ls.StartTime + } + if st.Status == "SNAPSHOTTING" { c := s.progress.Snapshot() @@ -120,6 +124,12 @@ func (s *sourceManager) runLocal(ctx context.Context) { case <-s.closed: return + case <-s.snapshotRequests: + nt := time.Now() + s.nextSnapshotTime = &nt + + continue + case <-time.After(statusRefreshInterval): s.refreshStatus(ctx) @@ -146,23 +156,28 @@ func (s *sourceManager) runRemote(ctx context.Context) { } } +func (s *sourceManager) scheduleSnapshotNow() { + select { + case s.snapshotRequests <- struct{}{}: // scheduled snapshot + default: // already scheduled + } +} + func (s *sourceManager) upload() serverapi.SourceActionResponse { log.Infof("upload triggered via API: %v", s.src) + s.scheduleSnapshotNow() + return serverapi.SourceActionResponse{Success: true} } func (s *sourceManager) cancel() serverapi.SourceActionResponse { log.Infof("cancel triggered via API: %v", s.src) - return serverapi.SourceActionResponse{Success: true} -} -func (s *sourceManager) pause() serverapi.SourceActionResponse { - log.Infof("pause triggered via API: %v", s.src) - return serverapi.SourceActionResponse{Success: true} -} + if u := s.currentUploader(); u != nil { + log.Infof("canceling current upload") + u.Cancel() + } -func (s *sourceManager) resume() serverapi.SourceActionResponse { - log.Infof("resume triggered via API: %v", s.src) return serverapi.SourceActionResponse{Success: true} } @@ -296,11 +311,12 @@ func (s *sourceManager) refreshStatus(ctx context.Context) { func newSourceManager(src snapshot.SourceInfo, server *Server) *sourceManager { m := &sourceManager{ - src: src, - server: server, - state: "UNKNOWN", - closed: make(chan struct{}), - progress: &snapshotfs.CountingUploadProgress{}, + src: src, + server: server, + state: "UNKNOWN", + closed: make(chan struct{}), + snapshotRequests: make(chan struct{}, 1), + progress: &snapshotfs.CountingUploadProgress{}, } return m diff --git a/internal/serverapi/client_wrappers.go b/internal/serverapi/client_wrappers.go index 37ad4989d..2594806a7 100644 --- a/internal/serverapi/client_wrappers.go +++ b/internal/serverapi/client_wrappers.go @@ -1,14 +1,44 @@ package serverapi -import "context" +import ( + "context" + "strings" + + "github.com/kopia/kopia/snapshot" +) + +// CreateSnapshotSource creates snapshot source with a given path. +func (c *Client) CreateSnapshotSource(ctx context.Context, req *CreateSnapshotSourceRequest) error { + return c.Post("sources", req, &CreateSnapshotSourceRequest{}) +} + +// UploadSnapshots triggers snapshot upload on matching snapshots. +func (c *Client) UploadSnapshots(ctx context.Context, match *snapshot.SourceInfo) (*MultipleSourceActionResponse, error) { + resp := &MultipleSourceActionResponse{} + if err := c.Post("sources/upload"+matchSourceParameters(match), &Empty{}, resp); err != nil { + return nil, err + } + + return resp, nil +} + +// CancelUpload cancels snapshot upload on matching snapshots. +func (c *Client) CancelUpload(ctx context.Context, match *snapshot.SourceInfo) (*MultipleSourceActionResponse, error) { + resp := &MultipleSourceActionResponse{} + if err := c.Post("sources/cancel"+matchSourceParameters(match), &Empty{}, resp); err != nil { + return nil, err + } + + return resp, nil +} // CreateRepository invokes the 'repo/create' API. -func (c *Client) CreateRepository(ctx context.Context, req *CreateRequest) error { +func (c *Client) CreateRepository(ctx context.Context, req *CreateRepositoryRequest) error { return c.Post("repo/create", req, &StatusResponse{}) } // ConnectToRepository invokes the 'repo/connect' API. -func (c *Client) ConnectToRepository(ctx context.Context, req *ConnectRequest) error { +func (c *Client) ConnectToRepository(ctx context.Context, req *ConnectRepositoryRequest) error { return c.Post("repo/connect", req, &StatusResponse{}) } @@ -32,12 +62,47 @@ func (c *Client) Status(ctx context.Context) (*StatusResponse, error) { return resp, nil } -// Sources invokes the 'sources' API. -func (c *Client) Sources(ctx context.Context) (*SourcesResponse, error) { +// ListSources lists the snapshot sources managed by the server. +func (c *Client) ListSources(ctx context.Context, match *snapshot.SourceInfo) (*SourcesResponse, error) { resp := &SourcesResponse{} - if err := c.Get("sources", resp); err != nil { + if err := c.Get("sources"+matchSourceParameters(match), resp); err != nil { return nil, err } return resp, nil } + +// ListSnapshots invokes the 'sources' API. +func (c *Client) ListSnapshots(ctx context.Context, match *snapshot.SourceInfo) (*SnapshotsResponse, error) { + resp := &SnapshotsResponse{} + if err := c.Get("snapshots"+matchSourceParameters(match), resp); err != nil { + return nil, err + } + + return resp, nil +} + +func matchSourceParameters(match *snapshot.SourceInfo) string { + if match == nil { + return "" + } + + var clauses []string + if v := match.Host; v != "" { + clauses = append(clauses, "host="+v) + } + + if v := match.UserName; v != "" { + clauses = append(clauses, "username="+v) + } + + if v := match.Path; v != "" { + clauses = append(clauses, "path="+v) + } + + if len(clauses) == 0 { + return "" + } + + return "?" + strings.Join(clauses, "&") +} diff --git a/internal/serverapi/serverapi.go b/internal/serverapi/serverapi.go index dc5b47db6..c73c069a1 100644 --- a/internal/serverapi/serverapi.go +++ b/internal/serverapi/serverapi.go @@ -4,8 +4,10 @@ import ( "time" + "github.com/kopia/kopia/fs" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" "github.com/kopia/kopia/snapshot/snapshotfs" @@ -33,7 +35,7 @@ type SourceStatus struct { Source snapshot.SourceInfo `json:"source"` Status string `json:"status"` SchedulingPolicy policy.SchedulingPolicy `json:"schedule"` - LastSnapshotTime time.Time `json:"lastSnapshotTime,omitempty"` + LastSnapshotTime *time.Time `json:"lastSnapshotTime,omitempty"` NextSnapshotTime *time.Time `json:"nextSnapshotTime,omitempty"` UploadCounters *snapshotfs.UploadCounters `json:"upload,omitempty"` } @@ -60,15 +62,16 @@ type Empty struct { // Supported error codes. const ( ErrorInternal APIErrorCode = "INTERNAL" - ErrorMalformedRequest APIErrorCode = "MALFORMED_REQUEST" - ErrorInvalidToken APIErrorCode = "INVALID_TOKEN" - ErrorStorageConnection APIErrorCode = "STORAGE_CONNECTION" - ErrorAlreadyInitialized APIErrorCode = "ALREADY_INITIALIZED" - ErrorNotInitialized APIErrorCode = "NOT_INITIALIZED" ErrorAlreadyConnected APIErrorCode = "ALREADY_CONNECTED" - ErrorNotConnected APIErrorCode = "NOT_CONNECTED" + ErrorAlreadyInitialized APIErrorCode = "ALREADY_INITIALIZED" ErrorInvalidPassword APIErrorCode = "INVALID_PASSWORD" + ErrorInvalidToken APIErrorCode = "INVALID_TOKEN" + ErrorMalformedRequest APIErrorCode = "MALFORMED_REQUEST" + ErrorNotConnected APIErrorCode = "NOT_CONNECTED" ErrorNotFound APIErrorCode = "NOT_FOUND" + ErrorNotInitialized APIErrorCode = "NOT_INITIALIZED" + ErrorPathNotFound APIErrorCode = "PATH_NOT_FOUND" + ErrorStorageConnection APIErrorCode = "STORAGE_CONNECTION" ) // ErrorResponse represents error response. @@ -87,14 +90,14 @@ type MultipleSourceActionResponse struct { Sources map[string]SourceActionResponse `json:"sources"` } -// CreateRequest contains request to create a repository in a given storage -type CreateRequest struct { - ConnectRequest +// CreateRepositoryRequest contains request to create a repository in a given storage +type CreateRepositoryRequest struct { + ConnectRepositoryRequest NewRepositoryOptions repo.NewRepositoryOptions `json:"options"` } -// ConnectRequest contains request to connect to a repository. -type ConnectRequest struct { +// ConnectRepositoryRequest contains request to connect to a repository. +type ConnectRepositoryRequest struct { Storage blob.ConnectionInfo `json:"storage"` Password string `json:"password"` Token string `json:"token"` // when set, overrides Storage and Password @@ -110,3 +113,27 @@ type SupportedAlgorithmsResponse struct { SplitterAlgorithms []string `json:"splitter"` CompressionAlgorithms []string `json:"compression"` } + +// CreateSnapshotSourceRequest contains request to create snapshot source and optionally create first snapshot. +type CreateSnapshotSourceRequest struct { + Path string `json:"path"` + CreateSnapshot bool `json:"createSnapshot"` +} + +// Snapshot describes single snapshot entry. +type Snapshot struct { + ID manifest.ID `json:"id"` + Source snapshot.SourceInfo `json:"source"` + Description string `json:"description"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` + IncompleteReason string `json:"incomplete,omitempty"` + Summary *fs.DirectorySummary `json:"summary"` + RootEntry string `json:"rootID"` + RetentionReasons []string `json:"retention"` +} + +// SnapshotsResponse contains a list of snapshots. +type SnapshotsResponse struct { + Snapshots []*Snapshot `json:"snapshots"` +} diff --git a/tests/end_to_end_test/server_start_test.go b/tests/end_to_end_test/server_start_test.go index 86d007c7d..77ca948a6 100644 --- a/tests/end_to_end_test/server_start_test.go +++ b/tests/end_to_end_test/server_start_test.go @@ -6,9 +6,13 @@ "testing" "time" + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/retry" "github.com/kopia/kopia/internal/serverapi" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/filesystem" + "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/tests/testenv" ) @@ -80,7 +84,7 @@ func TestServerStart(t *testing.T) { t.Errorf("unexpected storage type: %v, want %v", got, want) } - sources, err := cli.Sources(ctx) + sources, err := cli.ListSources(ctx, nil) if err != nil { t.Fatalf("error listing sources: %v", err) } @@ -92,6 +96,30 @@ func TestServerStart(t *testing.T) { if got, want := sources.Sources[0].Source.Path, sharedTestDataDir1; got != want { t.Errorf("unexpected source path: %v, want %v", got, want) } + + if err = cli.CreateSnapshotSource(ctx, &serverapi.CreateSnapshotSourceRequest{ + Path: sharedTestDataDir2, + }); err != nil { + t.Fatalf("create snapshot source error: %v", err) + } + + verifySourceCount(t, cli, nil, 2) + verifySourceCount(t, cli, &snapshot.SourceInfo{Host: "no-such-host"}, 0) + verifySourceCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2}, 1) + + verifySnapshotCount(t, cli, nil, 2) + verifySnapshotCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir1}, 2) + verifySnapshotCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2}, 0) + verifySnapshotCount(t, cli, &snapshot.SourceInfo{Host: "no-such-host"}, 0) + + uploadMatchingSnapshots(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2}) + waitForSnapshotCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2}, 1) + + if _, err = cli.CancelUpload(ctx, nil); err != nil { + t.Fatalf("cancel failed: %v", err) + } + + verifySnapshotCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2}, 1) } func TestServerStartWithoutInitialRepository(t *testing.T) { @@ -129,8 +157,8 @@ func TestServerStartWithoutInitialRepository(t *testing.T) { waitUntilServerStarted(t, cli) verifyServerConnected(t, cli, false) - if err = cli.CreateRepository(ctx, &serverapi.CreateRequest{ - ConnectRequest: serverapi.ConnectRequest{ + if err = cli.CreateRepository(ctx, &serverapi.CreateRepositoryRequest{ + ConnectRepositoryRequest: serverapi.ConnectRepositoryRequest{ Password: "foofoo", Storage: connInfo, }, @@ -146,7 +174,7 @@ func TestServerStartWithoutInitialRepository(t *testing.T) { verifyServerConnected(t, cli, false) - if err = cli.ConnectToRepository(ctx, &serverapi.ConnectRequest{ + if err = cli.ConnectToRepository(ctx, &serverapi.ConnectRepositoryRequest{ Password: "foofoo", Storage: connInfo, }); err != nil { @@ -171,6 +199,60 @@ func verifyServerConnected(t *testing.T, cli *serverapi.Client, want bool) *serv return st } +func waitForSnapshotCount(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo, want int) { + t.Helper() + + err := retry.WithExponentialBackoffNoValue("wait for snapshots", func() error { + snapshots, err := cli.ListSnapshots(context.Background(), match) + if err != nil { + return errors.Wrap(err, "error listing sources") + } + + if got := len(snapshots.Snapshots); got != want { + return errors.Errorf("unexpected number of snapshots %v, want %v", got, want) + } + + return nil + }, func(err error) bool { return err != nil }) + if err != nil { + t.Fatal(err) + } +} + +func uploadMatchingSnapshots(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo) { + t.Helper() + + if _, err := cli.UploadSnapshots(context.Background(), match); err != nil { + t.Fatalf("upload failed: %v", err) + } +} + +func verifySnapshotCount(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo, want int) { + t.Helper() + + snapshots, err := cli.ListSnapshots(context.Background(), match) + if err != nil { + t.Fatalf("error listing sources: %v", err) + } + + if got := len(snapshots.Snapshots); got != want { + t.Errorf("unexpected number of snapshots %v, want %v", got, want) + } +} + +func verifySourceCount(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo, want int) { + t.Helper() + + sources, err := cli.ListSources(context.Background(), match) + if err != nil { + t.Fatalf("error listing sources: %v", err) + } + + if got := len(sources.Sources); got != want { + t.Errorf("unexpected number of sources %v, want %v", got, want) + } +} + func waitUntilServerStarted(t *testing.T, cli *serverapi.Client) { for i := 0; i < 60; i++ { if _, err := cli.Status(context.Background()); err == nil {