mirror of
https://github.com/kopia/kopia.git
synced 2026-05-24 14:44:47 -04:00
server: implemented remaining server API methods
CreateSnapshotSource API for ensuring source exists Upload - starts upload on a given source or matching sources Cancel - cancels upload on a given source or matching sources
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/kopia/kopia/internal/serverapi"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
@@ -36,45 +37,44 @@ func (s *Server) handlePolicyList(ctx context.Context, r *http.Request) (interfa
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) handlePolicyCRUD(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
host := r.URL.Query().Get("host")
|
||||
path := r.URL.Query().Get("path")
|
||||
username := r.URL.Query().Get("userName")
|
||||
target := snapshot.SourceInfo{
|
||||
func getPolicyTargetFromURL(u *url.URL) snapshot.SourceInfo {
|
||||
host := u.Query().Get("host")
|
||||
path := u.Query().Get("path")
|
||||
username := u.Query().Get("userName")
|
||||
|
||||
return snapshot.SourceInfo{
|
||||
Host: host,
|
||||
Path: path,
|
||||
UserName: username,
|
||||
}
|
||||
|
||||
switch r.Method {
|
||||
case "GET":
|
||||
pol, err := policy.GetDefinedPolicy(ctx, s.rep, target)
|
||||
if err == policy.ErrPolicyNotFound {
|
||||
return nil, requestError(serverapi.ErrorNotFound, "policy not found")
|
||||
}
|
||||
|
||||
return pol, nil
|
||||
|
||||
case "DELETE":
|
||||
if err := policy.RemovePolicy(ctx, s.rep, target); err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
|
||||
return &serverapi.Empty{}, nil
|
||||
|
||||
case "PUT":
|
||||
newPolicy := &policy.Policy{}
|
||||
if err := json.NewDecoder(r.Body).Decode(newPolicy); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
|
||||
}
|
||||
|
||||
if err := policy.SetPolicy(ctx, s.rep, target, newPolicy); err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
|
||||
return &serverapi.Empty{}, nil
|
||||
|
||||
default:
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "incompatible HTTP method")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
pol, err := policy.GetDefinedPolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL))
|
||||
if err == policy.ErrPolicyNotFound {
|
||||
return nil, requestError(serverapi.ErrorNotFound, "policy not found")
|
||||
}
|
||||
|
||||
return pol, nil
|
||||
}
|
||||
|
||||
func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
if err := policy.RemovePolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL)); err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
|
||||
return &serverapi.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) handlePolicyPut(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
newPolicy := &policy.Policy{}
|
||||
if err := json.NewDecoder(r.Body).Decode(newPolicy); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
|
||||
}
|
||||
|
||||
if err := policy.SetPolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL), newPolicy); err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
|
||||
return &serverapi.Empty{}, nil
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interfa
|
||||
}, nil
|
||||
}
|
||||
|
||||
func maybeDecodeToken(req *serverapi.ConnectRequest) *apiError {
|
||||
func maybeDecodeToken(req *serverapi.ConnectRepositoryRequest) *apiError {
|
||||
if req.Token != "" {
|
||||
ci, password, err := repo.DecodeToken(req.Token)
|
||||
if err != nil {
|
||||
@@ -57,13 +57,13 @@ func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interfa
|
||||
return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected")
|
||||
}
|
||||
|
||||
var req serverapi.CreateRequest
|
||||
var req serverapi.CreateRepositoryRequest
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error())
|
||||
}
|
||||
|
||||
if err := maybeDecodeToken(&req.ConnectRequest); err != nil {
|
||||
if err := maybeDecodeToken(&req.ConnectRepositoryRequest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interf
|
||||
return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected")
|
||||
}
|
||||
|
||||
var req serverapi.ConnectRequest
|
||||
var req serverapi.ConnectRepositoryRequest
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error())
|
||||
|
||||
@@ -4,31 +4,13 @@
|
||||
"context"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/internal/serverapi"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
)
|
||||
|
||||
type snapshotListEntry struct {
|
||||
ID manifest.ID `json:"id"`
|
||||
Source snapshot.SourceInfo `json:"source"`
|
||||
Description string `json:"description"`
|
||||
StartTime time.Time `json:"startTime"`
|
||||
EndTime time.Time `json:"endTime"`
|
||||
IncompleteReason string `json:"incomplete,omitempty"`
|
||||
Summary *fs.DirectorySummary `json:"summary"`
|
||||
RootEntry string `json:"rootID"`
|
||||
RetentionReasons []string `json:"retention"`
|
||||
}
|
||||
|
||||
type snapshotListResponse struct {
|
||||
Snapshots []*snapshotListEntry `json:"snapshots"`
|
||||
}
|
||||
|
||||
func (s *Server) handleSourceSnapshotList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleSnapshotList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
manifestIDs, err := snapshot.ListSnapshotManifests(ctx, s.rep, nil)
|
||||
if err != nil {
|
||||
return nil, internalServerError(err)
|
||||
@@ -39,8 +21,8 @@ func (s *Server) handleSourceSnapshotList(ctx context.Context, r *http.Request)
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
|
||||
resp := &snapshotListResponse{
|
||||
Snapshots: []*snapshotListEntry{},
|
||||
resp := &serverapi.SnapshotsResponse{
|
||||
Snapshots: []*serverapi.Snapshot{},
|
||||
}
|
||||
|
||||
groups := snapshot.GroupBySource(manifests)
|
||||
@@ -79,8 +61,8 @@ func sourceMatchesURLFilter(src snapshot.SourceInfo, query url.Values) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func convertSnapshotManifest(m *snapshot.Manifest) *snapshotListEntry {
|
||||
e := &snapshotListEntry{
|
||||
func convertSnapshotManifest(m *snapshot.Manifest) *serverapi.Snapshot {
|
||||
e := &serverapi.Snapshot{
|
||||
ID: m.ID,
|
||||
Source: m.Source,
|
||||
Description: m.Description,
|
||||
111
internal/server/api_sources.go
Normal file
111
internal/server/api_sources.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/serverapi"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
)
|
||||
|
||||
func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
resp := &serverapi.SourcesResponse{
|
||||
Sources: []*serverapi.SourceStatus{},
|
||||
}
|
||||
|
||||
for _, v := range s.sourceManagers {
|
||||
if !sourceMatchesURLFilter(v.src, r.URL.Query()) {
|
||||
continue
|
||||
}
|
||||
|
||||
resp.Sources = append(resp.Sources, v.Status())
|
||||
}
|
||||
|
||||
sort.Slice(resp.Sources, func(i, j int) bool {
|
||||
return resp.Sources[i].Source.String() < resp.Sources[j].Source.String()
|
||||
})
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleSourcesCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
var req serverapi.CreateSnapshotSourceRequest
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
|
||||
}
|
||||
|
||||
if req.Path == "" {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "missing path")
|
||||
}
|
||||
|
||||
_, err := os.Stat(req.Path)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, requestError(serverapi.ErrorPathNotFound, "path does not exist")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
|
||||
sourceInfo := snapshot.SourceInfo{
|
||||
UserName: s.options.Username,
|
||||
Host: s.options.Hostname,
|
||||
Path: req.Path,
|
||||
}
|
||||
|
||||
// ensure we have the policy for this source, otherwise it will not show up in the
|
||||
// list of sources at all.
|
||||
_, err = policy.GetDefinedPolicy(ctx, s.rep, sourceInfo)
|
||||
switch err {
|
||||
case nil:
|
||||
log.Debugf("policy for %v already exists", sourceInfo)
|
||||
// have policy, do nothing
|
||||
|
||||
case policy.ErrPolicyNotFound:
|
||||
// don't have policy - create an empty one
|
||||
log.Debugf("policy for %v not found, creating empty one", sourceInfo)
|
||||
|
||||
if err = policy.SetPolicy(ctx, s.rep, sourceInfo, &policy.Policy{}); err != nil {
|
||||
return nil, internalServerError(errors.Wrap(err, "unable to set policy"))
|
||||
}
|
||||
|
||||
if err = s.rep.Flush(ctx); err != nil {
|
||||
return nil, internalServerError(errors.Wrap(err, "unable to flush"))
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
|
||||
// upgrade to exclusive lock to ensure we have source manager
|
||||
s.mu.RUnlock()
|
||||
s.mu.Lock()
|
||||
if s.sourceManagers[sourceInfo] == nil {
|
||||
log.Debugf("creating source manager for %v", sourceInfo)
|
||||
sm := newSourceManager(sourceInfo, s)
|
||||
s.sourceManagers[sourceInfo] = sm
|
||||
|
||||
go sm.run(context.Background())
|
||||
}
|
||||
s.mu.Unlock()
|
||||
s.mu.RLock()
|
||||
|
||||
manager := s.sourceManagers[sourceInfo]
|
||||
if manager == nil {
|
||||
return nil, internalServerError(errors.Errorf("could not find source manager that was just created"))
|
||||
}
|
||||
|
||||
if req.CreateSnapshot {
|
||||
log.Debugf("scheduling snapshot of %v immediately...", sourceInfo)
|
||||
manager.scheduleSnapshotNow()
|
||||
}
|
||||
|
||||
return &serverapi.Empty{}, nil
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sort"
|
||||
|
||||
"github.com/kopia/kopia/internal/serverapi"
|
||||
)
|
||||
|
||||
func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
resp := &serverapi.SourcesResponse{
|
||||
Sources: []*serverapi.SourceStatus{},
|
||||
}
|
||||
|
||||
for _, v := range s.sourceManagers {
|
||||
if !sourceMatchesURLFilter(v.src, r.URL.Query()) {
|
||||
continue
|
||||
}
|
||||
|
||||
resp.Sources = append(resp.Sources, v.Status())
|
||||
}
|
||||
|
||||
sort.Slice(resp.Sources, func(i, j int) bool {
|
||||
return resp.Sources[i].Source.String() < resp.Sources[j].Source.String()
|
||||
})
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
@@ -16,6 +16,7 @@
|
||||
"github.com/kopia/kopia/internal/serverapi"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
)
|
||||
|
||||
var log = kopialogging.Logger("kopia/server")
|
||||
@@ -37,32 +38,37 @@ type Server struct {
|
||||
|
||||
// APIHandlers handles API requests.
|
||||
func (s *Server) APIHandlers() http.Handler {
|
||||
mux := mux.NewRouter()
|
||||
m := mux.NewRouter()
|
||||
|
||||
mux.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesList)).Methods("GET")
|
||||
mux.HandleFunc("/api/v1/snapshots", s.handleAPI(s.handleSourceSnapshotList)).Methods("GET")
|
||||
mux.HandleFunc("/api/v1/policies", s.handleAPI(s.handlePolicyList)).Methods("GET")
|
||||
mux.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyCRUD)).Methods("GET", "PUT", "DELETE")
|
||||
// sources
|
||||
m.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesList)).Methods("GET")
|
||||
m.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesCreate)).Methods("POST")
|
||||
m.HandleFunc("/api/v1/sources/upload", s.handleAPI(s.handleUpload)).Methods("POST")
|
||||
m.HandleFunc("/api/v1/sources/cancel", s.handleAPI(s.handleCancel)).Methods("POST")
|
||||
|
||||
mux.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh)).Methods("POST")
|
||||
mux.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush)).Methods("POST")
|
||||
mux.HandleFunc("/api/v1/shutdown", s.handleAPIPossiblyNotConnected(s.handleShutdown)).Methods("POST")
|
||||
// snapshots
|
||||
m.HandleFunc("/api/v1/snapshots", s.handleAPI(s.handleSnapshotList)).Methods("GET")
|
||||
|
||||
mux.HandleFunc("/api/v1/sources/pause", s.handleAPI(s.handlePause)).Methods("POST")
|
||||
mux.HandleFunc("/api/v1/sources/resume", s.handleAPI(s.handleResume)).Methods("POST")
|
||||
mux.HandleFunc("/api/v1/sources/upload", s.handleAPI(s.handleUpload)).Methods("POST")
|
||||
mux.HandleFunc("/api/v1/sources/cancel", s.handleAPI(s.handleCancel)).Methods("POST")
|
||||
m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyGet)).Methods("GET")
|
||||
m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyPut)).Methods("PUT")
|
||||
m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyDelete)).Methods("DELETE")
|
||||
|
||||
mux.HandleFunc("/api/v1/objects/", s.handleObjectGet)
|
||||
m.HandleFunc("/api/v1/policies", s.handleAPI(s.handlePolicyList)).Methods("GET")
|
||||
|
||||
mux.HandleFunc("/api/v1/repo/status", s.handleAPIPossiblyNotConnected(s.handleRepoStatus)).Methods("GET")
|
||||
mux.HandleFunc("/api/v1/repo/connect", s.handleAPIPossiblyNotConnected(s.handleRepoConnect)).Methods("POST")
|
||||
mux.HandleFunc("/api/v1/repo/create", s.handleAPIPossiblyNotConnected(s.handleRepoCreate)).Methods("POST")
|
||||
mux.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect)).Methods("POST")
|
||||
mux.HandleFunc("/api/v1/repo/algorithms", s.handleAPIPossiblyNotConnected(s.handleRepoSupportedAlgorithms)).Methods("GET")
|
||||
mux.HandleFunc("/api/v1/repo/sync", s.handleAPI(s.handleRepoSync)).Methods("POST")
|
||||
m.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh)).Methods("POST")
|
||||
m.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush)).Methods("POST")
|
||||
m.HandleFunc("/api/v1/shutdown", s.handleAPIPossiblyNotConnected(s.handleShutdown)).Methods("POST")
|
||||
|
||||
return mux
|
||||
m.HandleFunc("/api/v1/objects/", s.handleObjectGet).Methods("GET")
|
||||
|
||||
m.HandleFunc("/api/v1/repo/status", s.handleAPIPossiblyNotConnected(s.handleRepoStatus)).Methods("GET")
|
||||
m.HandleFunc("/api/v1/repo/connect", s.handleAPIPossiblyNotConnected(s.handleRepoConnect)).Methods("POST")
|
||||
m.HandleFunc("/api/v1/repo/create", s.handleAPIPossiblyNotConnected(s.handleRepoCreate)).Methods("POST")
|
||||
m.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect)).Methods("POST")
|
||||
m.HandleFunc("/api/v1/repo/algorithms", s.handleAPIPossiblyNotConnected(s.handleRepoSupportedAlgorithms)).Methods("GET")
|
||||
m.HandleFunc("/api/v1/repo/sync", s.handleAPI(s.handleRepoSync)).Methods("POST")
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.HandlerFunc {
|
||||
@@ -152,14 +158,6 @@ func (s *Server) handleUpload(ctx context.Context, r *http.Request) (interface{}
|
||||
return s.forAllSourceManagersMatchingURLFilter((*sourceManager).upload, r.URL.Query())
|
||||
}
|
||||
|
||||
func (s *Server) handlePause(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
return s.forAllSourceManagersMatchingURLFilter((*sourceManager).pause, r.URL.Query())
|
||||
}
|
||||
|
||||
func (s *Server) handleResume(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
return s.forAllSourceManagersMatchingURLFilter((*sourceManager).resume, r.URL.Query())
|
||||
}
|
||||
|
||||
func (s *Server) handleCancel(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
return s.forAllSourceManagersMatchingURLFilter((*sourceManager).cancel, r.URL.Query())
|
||||
}
|
||||
@@ -234,7 +232,7 @@ func (s *Server) refreshPeriodically(ctx context.Context, r *repo.Repository) {
|
||||
log.Warningf("error refreshing repository: %v", err)
|
||||
}
|
||||
|
||||
if err := s.syncSourcesLocked(ctx); err != nil {
|
||||
if err := s.SyncSources(ctx); err != nil {
|
||||
log.Warningf("unable to sync sources: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -270,11 +268,28 @@ func (s *Server) stopAllSourceManagersLocked() {
|
||||
}
|
||||
|
||||
func (s *Server) syncSourcesLocked(ctx context.Context) error {
|
||||
sources, err := snapshot.ListSources(ctx, s.rep)
|
||||
sources := map[snapshot.SourceInfo]bool{}
|
||||
|
||||
snapshotSources, err := snapshot.ListSources(ctx, s.rep)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to list sources")
|
||||
}
|
||||
|
||||
policies, err := policy.ListPolicies(ctx, s.rep)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to list sources")
|
||||
}
|
||||
|
||||
for _, ss := range snapshotSources {
|
||||
sources[ss] = true
|
||||
}
|
||||
|
||||
for _, pol := range policies {
|
||||
if pol.Target().Path != "" && pol.Target().Host != "" && pol.Target().UserName != "" {
|
||||
sources[pol.Target()] = true
|
||||
}
|
||||
}
|
||||
|
||||
// copy existing sources to a map, from which we will remove sources that are found
|
||||
// in the repository
|
||||
oldSourceManagers := map[snapshot.SourceInfo]*sourceManager{}
|
||||
@@ -282,7 +297,7 @@ func (s *Server) syncSourcesLocked(ctx context.Context) error {
|
||||
oldSourceManagers[k] = v
|
||||
}
|
||||
|
||||
for _, src := range sources {
|
||||
for src := range sources {
|
||||
if _, ok := oldSourceManagers[src]; ok {
|
||||
// pre-existing source, already has a manager
|
||||
delete(oldSourceManagers, src)
|
||||
|
||||
@@ -28,10 +28,11 @@
|
||||
type sourceManager struct {
|
||||
snapshotfs.NullUploadProgress
|
||||
|
||||
server *Server
|
||||
src snapshot.SourceInfo
|
||||
closed chan struct{}
|
||||
wg sync.WaitGroup
|
||||
server *Server
|
||||
src snapshot.SourceInfo
|
||||
closed chan struct{}
|
||||
snapshotRequests chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
mu sync.RWMutex
|
||||
uploader *snapshotfs.Uploader
|
||||
@@ -51,11 +52,14 @@ func (s *sourceManager) Status() *serverapi.SourceStatus {
|
||||
st := &serverapi.SourceStatus{
|
||||
Source: s.src,
|
||||
Status: s.state,
|
||||
LastSnapshotTime: s.lastSnapshot.StartTime,
|
||||
NextSnapshotTime: s.nextSnapshotTime,
|
||||
SchedulingPolicy: s.pol,
|
||||
}
|
||||
|
||||
if ls := s.lastSnapshot; ls != nil {
|
||||
st.LastSnapshotTime = &ls.StartTime
|
||||
}
|
||||
|
||||
if st.Status == "SNAPSHOTTING" {
|
||||
c := s.progress.Snapshot()
|
||||
|
||||
@@ -120,6 +124,12 @@ func (s *sourceManager) runLocal(ctx context.Context) {
|
||||
case <-s.closed:
|
||||
return
|
||||
|
||||
case <-s.snapshotRequests:
|
||||
nt := time.Now()
|
||||
s.nextSnapshotTime = &nt
|
||||
|
||||
continue
|
||||
|
||||
case <-time.After(statusRefreshInterval):
|
||||
s.refreshStatus(ctx)
|
||||
|
||||
@@ -146,23 +156,28 @@ func (s *sourceManager) runRemote(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sourceManager) scheduleSnapshotNow() {
|
||||
select {
|
||||
case s.snapshotRequests <- struct{}{}: // scheduled snapshot
|
||||
default: // already scheduled
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sourceManager) upload() serverapi.SourceActionResponse {
|
||||
log.Infof("upload triggered via API: %v", s.src)
|
||||
s.scheduleSnapshotNow()
|
||||
|
||||
return serverapi.SourceActionResponse{Success: true}
|
||||
}
|
||||
|
||||
func (s *sourceManager) cancel() serverapi.SourceActionResponse {
|
||||
log.Infof("cancel triggered via API: %v", s.src)
|
||||
return serverapi.SourceActionResponse{Success: true}
|
||||
}
|
||||
|
||||
func (s *sourceManager) pause() serverapi.SourceActionResponse {
|
||||
log.Infof("pause triggered via API: %v", s.src)
|
||||
return serverapi.SourceActionResponse{Success: true}
|
||||
}
|
||||
if u := s.currentUploader(); u != nil {
|
||||
log.Infof("canceling current upload")
|
||||
u.Cancel()
|
||||
}
|
||||
|
||||
func (s *sourceManager) resume() serverapi.SourceActionResponse {
|
||||
log.Infof("resume triggered via API: %v", s.src)
|
||||
return serverapi.SourceActionResponse{Success: true}
|
||||
}
|
||||
|
||||
@@ -296,11 +311,12 @@ func (s *sourceManager) refreshStatus(ctx context.Context) {
|
||||
|
||||
func newSourceManager(src snapshot.SourceInfo, server *Server) *sourceManager {
|
||||
m := &sourceManager{
|
||||
src: src,
|
||||
server: server,
|
||||
state: "UNKNOWN",
|
||||
closed: make(chan struct{}),
|
||||
progress: &snapshotfs.CountingUploadProgress{},
|
||||
src: src,
|
||||
server: server,
|
||||
state: "UNKNOWN",
|
||||
closed: make(chan struct{}),
|
||||
snapshotRequests: make(chan struct{}, 1),
|
||||
progress: &snapshotfs.CountingUploadProgress{},
|
||||
}
|
||||
|
||||
return m
|
||||
|
||||
@@ -1,14 +1,44 @@
|
||||
package serverapi
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
)
|
||||
|
||||
// CreateSnapshotSource creates snapshot source with a given path.
|
||||
func (c *Client) CreateSnapshotSource(ctx context.Context, req *CreateSnapshotSourceRequest) error {
|
||||
return c.Post("sources", req, &CreateSnapshotSourceRequest{})
|
||||
}
|
||||
|
||||
// UploadSnapshots triggers snapshot upload on matching snapshots.
|
||||
func (c *Client) UploadSnapshots(ctx context.Context, match *snapshot.SourceInfo) (*MultipleSourceActionResponse, error) {
|
||||
resp := &MultipleSourceActionResponse{}
|
||||
if err := c.Post("sources/upload"+matchSourceParameters(match), &Empty{}, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// CancelUpload cancels snapshot upload on matching snapshots.
|
||||
func (c *Client) CancelUpload(ctx context.Context, match *snapshot.SourceInfo) (*MultipleSourceActionResponse, error) {
|
||||
resp := &MultipleSourceActionResponse{}
|
||||
if err := c.Post("sources/cancel"+matchSourceParameters(match), &Empty{}, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// CreateRepository invokes the 'repo/create' API.
|
||||
func (c *Client) CreateRepository(ctx context.Context, req *CreateRequest) error {
|
||||
func (c *Client) CreateRepository(ctx context.Context, req *CreateRepositoryRequest) error {
|
||||
return c.Post("repo/create", req, &StatusResponse{})
|
||||
}
|
||||
|
||||
// ConnectToRepository invokes the 'repo/connect' API.
|
||||
func (c *Client) ConnectToRepository(ctx context.Context, req *ConnectRequest) error {
|
||||
func (c *Client) ConnectToRepository(ctx context.Context, req *ConnectRepositoryRequest) error {
|
||||
return c.Post("repo/connect", req, &StatusResponse{})
|
||||
}
|
||||
|
||||
@@ -32,12 +62,47 @@ func (c *Client) Status(ctx context.Context) (*StatusResponse, error) {
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Sources invokes the 'sources' API.
|
||||
func (c *Client) Sources(ctx context.Context) (*SourcesResponse, error) {
|
||||
// ListSources lists the snapshot sources managed by the server.
|
||||
func (c *Client) ListSources(ctx context.Context, match *snapshot.SourceInfo) (*SourcesResponse, error) {
|
||||
resp := &SourcesResponse{}
|
||||
if err := c.Get("sources", resp); err != nil {
|
||||
if err := c.Get("sources"+matchSourceParameters(match), resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// ListSnapshots invokes the 'sources' API.
|
||||
func (c *Client) ListSnapshots(ctx context.Context, match *snapshot.SourceInfo) (*SnapshotsResponse, error) {
|
||||
resp := &SnapshotsResponse{}
|
||||
if err := c.Get("snapshots"+matchSourceParameters(match), resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func matchSourceParameters(match *snapshot.SourceInfo) string {
|
||||
if match == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
var clauses []string
|
||||
if v := match.Host; v != "" {
|
||||
clauses = append(clauses, "host="+v)
|
||||
}
|
||||
|
||||
if v := match.UserName; v != "" {
|
||||
clauses = append(clauses, "username="+v)
|
||||
}
|
||||
|
||||
if v := match.Path; v != "" {
|
||||
clauses = append(clauses, "path="+v)
|
||||
}
|
||||
|
||||
if len(clauses) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
return "?" + strings.Join(clauses, "&")
|
||||
}
|
||||
|
||||
@@ -4,8 +4,10 @@
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||
@@ -33,7 +35,7 @@ type SourceStatus struct {
|
||||
Source snapshot.SourceInfo `json:"source"`
|
||||
Status string `json:"status"`
|
||||
SchedulingPolicy policy.SchedulingPolicy `json:"schedule"`
|
||||
LastSnapshotTime time.Time `json:"lastSnapshotTime,omitempty"`
|
||||
LastSnapshotTime *time.Time `json:"lastSnapshotTime,omitempty"`
|
||||
NextSnapshotTime *time.Time `json:"nextSnapshotTime,omitempty"`
|
||||
UploadCounters *snapshotfs.UploadCounters `json:"upload,omitempty"`
|
||||
}
|
||||
@@ -60,15 +62,16 @@ type Empty struct {
|
||||
// Supported error codes.
|
||||
const (
|
||||
ErrorInternal APIErrorCode = "INTERNAL"
|
||||
ErrorMalformedRequest APIErrorCode = "MALFORMED_REQUEST"
|
||||
ErrorInvalidToken APIErrorCode = "INVALID_TOKEN"
|
||||
ErrorStorageConnection APIErrorCode = "STORAGE_CONNECTION"
|
||||
ErrorAlreadyInitialized APIErrorCode = "ALREADY_INITIALIZED"
|
||||
ErrorNotInitialized APIErrorCode = "NOT_INITIALIZED"
|
||||
ErrorAlreadyConnected APIErrorCode = "ALREADY_CONNECTED"
|
||||
ErrorNotConnected APIErrorCode = "NOT_CONNECTED"
|
||||
ErrorAlreadyInitialized APIErrorCode = "ALREADY_INITIALIZED"
|
||||
ErrorInvalidPassword APIErrorCode = "INVALID_PASSWORD"
|
||||
ErrorInvalidToken APIErrorCode = "INVALID_TOKEN"
|
||||
ErrorMalformedRequest APIErrorCode = "MALFORMED_REQUEST"
|
||||
ErrorNotConnected APIErrorCode = "NOT_CONNECTED"
|
||||
ErrorNotFound APIErrorCode = "NOT_FOUND"
|
||||
ErrorNotInitialized APIErrorCode = "NOT_INITIALIZED"
|
||||
ErrorPathNotFound APIErrorCode = "PATH_NOT_FOUND"
|
||||
ErrorStorageConnection APIErrorCode = "STORAGE_CONNECTION"
|
||||
)
|
||||
|
||||
// ErrorResponse represents error response.
|
||||
@@ -87,14 +90,14 @@ type MultipleSourceActionResponse struct {
|
||||
Sources map[string]SourceActionResponse `json:"sources"`
|
||||
}
|
||||
|
||||
// CreateRequest contains request to create a repository in a given storage
|
||||
type CreateRequest struct {
|
||||
ConnectRequest
|
||||
// CreateRepositoryRequest contains request to create a repository in a given storage
|
||||
type CreateRepositoryRequest struct {
|
||||
ConnectRepositoryRequest
|
||||
NewRepositoryOptions repo.NewRepositoryOptions `json:"options"`
|
||||
}
|
||||
|
||||
// ConnectRequest contains request to connect to a repository.
|
||||
type ConnectRequest struct {
|
||||
// ConnectRepositoryRequest contains request to connect to a repository.
|
||||
type ConnectRepositoryRequest struct {
|
||||
Storage blob.ConnectionInfo `json:"storage"`
|
||||
Password string `json:"password"`
|
||||
Token string `json:"token"` // when set, overrides Storage and Password
|
||||
@@ -110,3 +113,27 @@ type SupportedAlgorithmsResponse struct {
|
||||
SplitterAlgorithms []string `json:"splitter"`
|
||||
CompressionAlgorithms []string `json:"compression"`
|
||||
}
|
||||
|
||||
// CreateSnapshotSourceRequest contains request to create snapshot source and optionally create first snapshot.
|
||||
type CreateSnapshotSourceRequest struct {
|
||||
Path string `json:"path"`
|
||||
CreateSnapshot bool `json:"createSnapshot"`
|
||||
}
|
||||
|
||||
// Snapshot describes single snapshot entry.
|
||||
type Snapshot struct {
|
||||
ID manifest.ID `json:"id"`
|
||||
Source snapshot.SourceInfo `json:"source"`
|
||||
Description string `json:"description"`
|
||||
StartTime time.Time `json:"startTime"`
|
||||
EndTime time.Time `json:"endTime"`
|
||||
IncompleteReason string `json:"incomplete,omitempty"`
|
||||
Summary *fs.DirectorySummary `json:"summary"`
|
||||
RootEntry string `json:"rootID"`
|
||||
RetentionReasons []string `json:"retention"`
|
||||
}
|
||||
|
||||
// SnapshotsResponse contains a list of snapshots.
|
||||
type SnapshotsResponse struct {
|
||||
Snapshots []*Snapshot `json:"snapshots"`
|
||||
}
|
||||
|
||||
@@ -6,9 +6,13 @@
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/retry"
|
||||
"github.com/kopia/kopia/internal/serverapi"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/filesystem"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/tests/testenv"
|
||||
)
|
||||
|
||||
@@ -80,7 +84,7 @@ func TestServerStart(t *testing.T) {
|
||||
t.Errorf("unexpected storage type: %v, want %v", got, want)
|
||||
}
|
||||
|
||||
sources, err := cli.Sources(ctx)
|
||||
sources, err := cli.ListSources(ctx, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("error listing sources: %v", err)
|
||||
}
|
||||
@@ -92,6 +96,30 @@ func TestServerStart(t *testing.T) {
|
||||
if got, want := sources.Sources[0].Source.Path, sharedTestDataDir1; got != want {
|
||||
t.Errorf("unexpected source path: %v, want %v", got, want)
|
||||
}
|
||||
|
||||
if err = cli.CreateSnapshotSource(ctx, &serverapi.CreateSnapshotSourceRequest{
|
||||
Path: sharedTestDataDir2,
|
||||
}); err != nil {
|
||||
t.Fatalf("create snapshot source error: %v", err)
|
||||
}
|
||||
|
||||
verifySourceCount(t, cli, nil, 2)
|
||||
verifySourceCount(t, cli, &snapshot.SourceInfo{Host: "no-such-host"}, 0)
|
||||
verifySourceCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2}, 1)
|
||||
|
||||
verifySnapshotCount(t, cli, nil, 2)
|
||||
verifySnapshotCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir1}, 2)
|
||||
verifySnapshotCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2}, 0)
|
||||
verifySnapshotCount(t, cli, &snapshot.SourceInfo{Host: "no-such-host"}, 0)
|
||||
|
||||
uploadMatchingSnapshots(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2})
|
||||
waitForSnapshotCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2}, 1)
|
||||
|
||||
if _, err = cli.CancelUpload(ctx, nil); err != nil {
|
||||
t.Fatalf("cancel failed: %v", err)
|
||||
}
|
||||
|
||||
verifySnapshotCount(t, cli, &snapshot.SourceInfo{Path: sharedTestDataDir2}, 1)
|
||||
}
|
||||
|
||||
func TestServerStartWithoutInitialRepository(t *testing.T) {
|
||||
@@ -129,8 +157,8 @@ func TestServerStartWithoutInitialRepository(t *testing.T) {
|
||||
waitUntilServerStarted(t, cli)
|
||||
verifyServerConnected(t, cli, false)
|
||||
|
||||
if err = cli.CreateRepository(ctx, &serverapi.CreateRequest{
|
||||
ConnectRequest: serverapi.ConnectRequest{
|
||||
if err = cli.CreateRepository(ctx, &serverapi.CreateRepositoryRequest{
|
||||
ConnectRepositoryRequest: serverapi.ConnectRepositoryRequest{
|
||||
Password: "foofoo",
|
||||
Storage: connInfo,
|
||||
},
|
||||
@@ -146,7 +174,7 @@ func TestServerStartWithoutInitialRepository(t *testing.T) {
|
||||
|
||||
verifyServerConnected(t, cli, false)
|
||||
|
||||
if err = cli.ConnectToRepository(ctx, &serverapi.ConnectRequest{
|
||||
if err = cli.ConnectToRepository(ctx, &serverapi.ConnectRepositoryRequest{
|
||||
Password: "foofoo",
|
||||
Storage: connInfo,
|
||||
}); err != nil {
|
||||
@@ -171,6 +199,60 @@ func verifyServerConnected(t *testing.T, cli *serverapi.Client, want bool) *serv
|
||||
return st
|
||||
}
|
||||
|
||||
func waitForSnapshotCount(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo, want int) {
|
||||
t.Helper()
|
||||
|
||||
err := retry.WithExponentialBackoffNoValue("wait for snapshots", func() error {
|
||||
snapshots, err := cli.ListSnapshots(context.Background(), match)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing sources")
|
||||
}
|
||||
|
||||
if got := len(snapshots.Snapshots); got != want {
|
||||
return errors.Errorf("unexpected number of snapshots %v, want %v", got, want)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func(err error) bool { return err != nil })
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func uploadMatchingSnapshots(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo) {
|
||||
t.Helper()
|
||||
|
||||
if _, err := cli.UploadSnapshots(context.Background(), match); err != nil {
|
||||
t.Fatalf("upload failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func verifySnapshotCount(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo, want int) {
|
||||
t.Helper()
|
||||
|
||||
snapshots, err := cli.ListSnapshots(context.Background(), match)
|
||||
if err != nil {
|
||||
t.Fatalf("error listing sources: %v", err)
|
||||
}
|
||||
|
||||
if got := len(snapshots.Snapshots); got != want {
|
||||
t.Errorf("unexpected number of snapshots %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func verifySourceCount(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo, want int) {
|
||||
t.Helper()
|
||||
|
||||
sources, err := cli.ListSources(context.Background(), match)
|
||||
if err != nil {
|
||||
t.Fatalf("error listing sources: %v", err)
|
||||
}
|
||||
|
||||
if got := len(sources.Sources); got != want {
|
||||
t.Errorf("unexpected number of sources %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func waitUntilServerStarted(t *testing.T, cli *serverapi.Client) {
|
||||
for i := 0; i < 60; i++ {
|
||||
if _, err := cli.Status(context.Background()); err == nil {
|
||||
|
||||
Reference in New Issue
Block a user