server: added support for new verbs in the API

/api/v1/repo/create
/api/v1/repo/connect
/api/v1/repo/disconnect

Refactored server code and fixed a number of outstanding robustness
issues. Tweaked the API responses a bit to make more sense when consumed
by the UI.
This commit is contained in:
Jarek Kowalski
2020-02-11 20:43:26 -08:00
parent 69d1ccbedf
commit 0f79279f5e
14 changed files with 588 additions and 160 deletions

View File

@@ -65,6 +65,14 @@ func serverAction(act func(ctx context.Context, cli *serverapi.Client) error) fu
}
func repositoryAction(act func(ctx context.Context, rep *repo.Repository) error) func(ctx *kingpin.ParseContext) error {
return maybeRepositoryAction(act, true)
}
func optionalRepositoryAction(act func(ctx context.Context, rep *repo.Repository) error) func(ctx *kingpin.ParseContext) error {
return maybeRepositoryAction(act, false)
}
func maybeRepositoryAction(act func(ctx context.Context, rep *repo.Repository) error, required bool) func(ctx *kingpin.ParseContext) error {
return func(kpc *kingpin.ParseContext) error {
return withProfiling(func() error {
startMemoryTracking()
@@ -80,14 +88,16 @@ func repositoryAction(act func(ctx context.Context, rep *repo.Repository) error)
}
})
rep, err := openRepository(ctx, nil)
if err != nil {
rep, err := openRepository(ctx, nil, required)
if err != nil && required {
return errors.Wrap(err, "open repository")
}
err = act(ctx, rep)
if cerr := rep.Close(ctx); cerr != nil {
return errors.Wrap(cerr, "unable to close repository")
if rep != nil {
if cerr := rep.Close(ctx); cerr != nil {
return errors.Wrap(cerr, "unable to close repository")
}
}
return err
})

View File

@@ -30,16 +30,25 @@
func init() {
addUserAndHostFlags(serverStartCommand)
serverStartCommand.Action(repositoryAction(runServer))
setupConnectOptions(serverStartCommand)
serverStartCommand.Action(optionalRepositoryAction(runServer))
}
func runServer(ctx context.Context, rep *repo.Repository) error {
srv, err := server.New(ctx, rep, getHostName(), getUserName())
srv, err := server.New(ctx, rep, server.Options{
ConfigFile: repositoryConfigFileName(),
Hostname: getHostName(),
Username: getUserName(),
ConnectOptions: connectOptions(),
RefreshInterval: *serverStartRefreshInterval,
})
if err != nil {
return errors.Wrap(err, "unable to initialize server")
}
go rep.RefreshPeriodically(ctx, *serverStartRefreshInterval)
if err = srv.SetRepository(ctx, rep); err != nil {
return errors.Wrap(err, "error connecting to repository")
}
mux := http.NewServeMux()
mux.Handle("/api/", srv.APIHandlers())
@@ -54,6 +63,14 @@ func runServer(ctx context.Context, rep *repo.Repository) error {
httpServer := &http.Server{Addr: stripProtocol(*serverAddress)}
srv.OnShutdown = httpServer.Shutdown
onCtrlC(func() {
log.Infof("Shutting down...")
if err = httpServer.Shutdown(ctx); err != nil {
log.Warningf("unable to shut down: %v", err)
}
})
handler := addInterceptors(mux)
if as := *serverStartAutoShutdown; as > 0 {
@@ -68,11 +85,13 @@ func runServer(ctx context.Context, rep *repo.Repository) error {
httpServer.Handler = handler
err = startServerWithOptionalTLS(httpServer)
if err == http.ErrServerClosed {
return nil
if err != http.ErrServerClosed {
return err
}
return err
srv.StopAllSourceManagers()
return nil
}
func stripProtocol(addr string) string {

View File

@@ -60,7 +60,11 @@ func waitForCtrlC() {
<-done
}
func openRepository(ctx context.Context, opts *repo.Options) (*repo.Repository, error) {
func openRepository(ctx context.Context, opts *repo.Options, required bool) (*repo.Repository, error) {
if _, err := os.Stat(repositoryConfigFileName()); os.IsNotExist(err) && !required {
return nil, nil
}
pass, err := getPasswordFromFlags(false, true)
if err != nil {
return nil, errors.Wrap(err, "get password")

View File

@@ -9,6 +9,10 @@ type apiError struct {
message string
}
func requestError(message string) *apiError {
return &apiError{400, message}
}
func internalServerError(err error) *apiError {
return &apiError{500, fmt.Sprintf("internal server error: %v", err)}
}

128
internal/server/api_repo.go Normal file
View File

@@ -0,0 +1,128 @@
package server
import (
"context"
"encoding/json"
"net/http"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
)
func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if s.rep == nil {
return &serverapi.StatusResponse{
Connected: false,
}, nil
}
return &serverapi.StatusResponse{
Connected: true,
ConfigFile: s.rep.ConfigFile,
CacheDir: s.rep.Content.CachingOptions.CacheDirectory,
Hash: s.rep.Content.Format.Hash,
Encryption: s.rep.Content.Format.Encryption,
MaxPackSize: s.rep.Content.Format.MaxPackSize,
Splitter: s.rep.Objects.Format.Splitter,
Storage: s.rep.Blobs.ConnectionInfo().Type,
}, nil
}
func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if s.rep != nil {
return nil, requestError("already connected")
}
var req serverapi.CreateRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, requestError("unable to decode request: " + err.Error())
}
st, err := blob.NewStorage(ctx, req.Storage)
if err != nil {
return nil, internalServerError(errors.Wrap(err, "unable to connect to storage"))
}
defer st.Close(ctx) //nolint:errcheck
if err := repo.Initialize(ctx, st, &req.NewRepositoryOptions, req.Password); err != nil {
return nil, internalServerError(errors.Wrap(err, "unable to initialize repository"))
}
return s.connectAndOpen(ctx, req.Storage, req.Password)
}
func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if s.rep != nil {
return nil, requestError("already connected")
}
var req serverapi.ConnectRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, requestError("unable to decode request: " + err.Error())
}
return s.connectAndOpen(ctx, req.Storage, req.Password)
}
func (s *Server) connectAndOpen(ctx context.Context, conn blob.ConnectionInfo, password string) (interface{}, *apiError) {
st, err := blob.NewStorage(ctx, conn)
if err != nil {
return nil, internalServerError(errors.Wrap(err, "can't open storage"))
}
defer st.Close(ctx) //nolint:errcheck
if err = repo.Connect(ctx, s.options.ConfigFile, st, password, s.options.ConnectOptions); err != nil {
return nil, internalServerError(errors.Wrap(err, "connect error"))
}
rep, err := repo.Open(ctx, s.options.ConfigFile, password, nil)
if err != nil {
return nil, internalServerError(errors.Wrap(err, "open error"))
}
// release shared lock so that SetRepository can acquire exclusive lock
s.mu.RUnlock()
err = s.SetRepository(ctx, rep)
s.mu.RLock()
if err != nil {
defer rep.Close(ctx) // nolint:errcheck
return nil, internalServerError(err)
}
return s.handleRepoStatus(ctx, &http.Request{})
}
func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if s.rep == nil {
return nil, requestError("already disconnected")
}
if err := repo.Disconnect(s.options.ConfigFile); err != nil {
return nil, internalServerError(err)
}
// release shared lock so that SetRepository can acquire exclusive lock
s.mu.RUnlock()
err := s.SetRepository(ctx, nil)
s.mu.RLock()
if err != nil {
return nil, internalServerError(err)
}
return &serverapi.Empty{}, nil
}
func (s *Server) handleRepoLock(ctx context.Context, r *http.Request) (interface{}, *apiError) {
return nil, &apiError{code: http.StatusNotImplemented}
}
func (s *Server) handleRepoUnlock(ctx context.Context, r *http.Request) (interface{}, *apiError) {
return nil, &apiError{code: http.StatusNotImplemented}
}

View File

@@ -1,21 +0,0 @@
package server
import (
"context"
"net/http"
"github.com/kopia/kopia/internal/serverapi"
)
func (s *Server) handleStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) {
bf := s.rep.Content.Format
bf.HMACSecret = nil
bf.MasterKey = nil
return &serverapi.StatusResponse{
ConfigFile: s.rep.ConfigFile,
CacheDir: s.rep.Content.CachingOptions.CacheDirectory,
BlockFormatting: bf,
Storage: s.rep.Blobs.ConnectionInfo().Type,
}, nil
}

View File

@@ -7,6 +7,7 @@
"net/http"
"net/url"
"sync"
"time"
"github.com/pkg/errors"
@@ -22,9 +23,12 @@
type Server struct {
OnShutdown func(ctx context.Context) error
hostname string
username string
rep *repo.Repository
options Options
rep *repo.Repository
cancelRep context.CancelFunc
// all API requests run with shared lock on this mutex
// administrative actions run with an exclusive lock and block API calls.
mu sync.RWMutex
sourceManagers map[snapshot.SourceInfo]*sourceManager
uploadSemaphore chan struct{}
@@ -34,26 +38,35 @@ type Server struct {
func (s *Server) APIHandlers() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/api/v1/status", s.handleAPI(s.handleStatus, "GET"))
mux.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesList, "GET"))
mux.HandleFunc("/api/v1/snapshots", s.handleAPI(s.handleSourceSnapshotList, "GET"))
mux.HandleFunc("/api/v1/policies", s.handleAPI(s.handlePolicyList, "GET"))
mux.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh, "POST"))
mux.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush, "POST"))
mux.HandleFunc("/api/v1/shutdown", s.handleAPI(s.handleShutdown, "POST"))
mux.HandleFunc("/api/v1/sources/pause", s.handleAPI(s.handlePause, "POST"))
mux.HandleFunc("/api/v1/sources/resume", s.handleAPI(s.handleResume, "POST"))
mux.HandleFunc("/api/v1/sources/upload", s.handleAPI(s.handleUpload, "POST"))
mux.HandleFunc("/api/v1/sources/cancel", s.handleAPI(s.handleCancel, "POST"))
mux.HandleFunc("/api/v1/objects/", s.handleObjectGet)
mux.HandleFunc("/api/v1/repo/status", s.handleAPI(s.handleRepoStatus, "GET"))
mux.HandleFunc("/api/v1/repo/connect", s.handleAPI(s.handleRepoConnect, "POST"))
mux.HandleFunc("/api/v1/repo/create", s.handleAPI(s.handleRepoCreate, "POST"))
mux.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect, "POST"))
mux.HandleFunc("/api/v1/repo/lock", s.handleAPI(s.handleRepoLock, "POST"))
mux.HandleFunc("/api/v1/repo/unlock", s.handleAPI(s.handleRepoUnlock, "POST"))
return mux
}
func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError), httpMethod string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
if r.Method != httpMethod {
http.Error(w, "incompatible HTTP method", http.StatusMethodNotAllowed)
@@ -74,7 +87,13 @@ func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interfa
return
}
http.Error(w, err.message, err.code)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(err.code)
_ = e.Encode(&serverapi.ErrorResponse{
Error: err.message,
})
}
}
@@ -91,10 +110,12 @@ func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{},
func (s *Server) handleShutdown(ctx context.Context, r *http.Request) (interface{}, *apiError) {
log.Infof("shutting down due to API request")
if s.OnShutdown != nil {
if err := s.OnShutdown(ctx); err != nil {
return nil, internalServerError(err)
}
if f := s.OnShutdown; f != nil {
go func() {
if err := f(ctx); err != nil {
log.Warningf("shutdown failed: %v", err)
}
}()
}
return &serverapi.Empty{}, nil
@@ -144,30 +165,141 @@ func (s *Server) endUpload(src snapshot.SourceInfo) {
<-s.uploadSemaphore
}
// SetRepository sets the repository (nil is allowed and indicates server that is not
// connected to the repository).
func (s *Server) SetRepository(ctx context.Context, rep *repo.Repository) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.rep == rep {
// nothing to do
return nil
}
if s.rep != nil {
// close previous source managers
s.stopAllSourceManagersLocked()
if err := s.rep.Close(ctx); err != nil {
return errors.Wrap(err, "unable to close previous repository")
}
cr := s.cancelRep
s.cancelRep = nil
if cr != nil {
cr()
}
}
s.rep = rep
if s.rep == nil {
return nil
}
if err := s.syncSourcesLocked(ctx); err != nil {
s.stopAllSourceManagersLocked()
s.rep = nil
return err
}
ctx, s.cancelRep = context.WithCancel(ctx)
go s.refreshPeriodically(ctx, rep)
return nil
}
func (s *Server) refreshPeriodically(ctx context.Context, r *repo.Repository) {
for {
select {
case <-ctx.Done():
return
case <-time.After(s.options.RefreshInterval):
if err := r.Refresh(ctx); err != nil {
log.Warningf("error refreshing repository: %v", err)
}
}
}
}
// StopAllSourceManagers causes all source managers to stop.
func (s *Server) StopAllSourceManagers() {
s.mu.Lock()
defer s.mu.Unlock()
s.stopAllSourceManagersLocked()
}
func (s *Server) stopAllSourceManagersLocked() {
for _, sm := range s.sourceManagers {
sm.stop()
}
for _, sm := range s.sourceManagers {
sm.waitUntilStopped()
}
s.sourceManagers = map[snapshot.SourceInfo]*sourceManager{}
}
func (s *Server) syncSourcesLocked(ctx context.Context) error {
sources, err := snapshot.ListSources(ctx, s.rep)
if err != nil {
return errors.Wrap(err, "unable to list sources")
}
// copy existing sources to a map, from which we will remove sources that are found
// in the repository
oldSourceManagers := map[snapshot.SourceInfo]*sourceManager{}
for k, v := range s.sourceManagers {
oldSourceManagers[k] = v
}
for _, src := range sources {
if _, ok := oldSourceManagers[src]; ok {
// pre-existing source, already has a manager
delete(oldSourceManagers, src)
} else {
sm := newSourceManager(src, s)
s.sourceManagers[src] = sm
go sm.run(ctx)
}
}
// whatever is left in oldSourceManagers are managers for sources that don't exist anymore.
// stop source manager for sources no longer in the repo.
for _, sm := range oldSourceManagers {
sm.stop()
}
for src, sm := range oldSourceManagers {
sm.waitUntilStopped()
delete(s.sourceManagers, src)
}
return nil
}
// Options encompasses all API server options.
type Options struct {
ConfigFile string
Hostname string
Username string
ConnectOptions *repo.ConnectOptions
RefreshInterval time.Duration
}
// New creates a Server on top of a given Repository.
// The server will manage sources for a given username@hostname.
func New(ctx context.Context, rep *repo.Repository, hostname, username string) (*Server, error) {
func New(ctx context.Context, rep *repo.Repository, options Options) (*Server, error) {
s := &Server{
hostname: hostname,
username: username,
rep: rep,
options: options,
sourceManagers: map[snapshot.SourceInfo]*sourceManager{},
uploadSemaphore: make(chan struct{}, 1),
}
sources, err := snapshot.ListSources(ctx, rep)
if err != nil {
return nil, errors.Wrap(err, "unable to list sources")
}
for _, src := range sources {
sm := newSourceManager(src, s)
s.sourceManagers[src] = sm
}
for _, src := range s.sourceManagers {
go src.run(ctx)
}
return s, nil
}

View File

@@ -31,18 +31,16 @@ type sourceManager struct {
server *Server
src snapshot.SourceInfo
closed chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
pol *policy.Policy
pol policy.SchedulingPolicy
state string
nextSnapshotTime time.Time
nextSnapshotTime *time.Time
lastCompleteSnapshot *snapshot.Manifest
lastSnapshot *snapshot.Manifest
// state of current upload
uploadPath string
uploadPathCompleted int64
uploadPathTotal int64
progress *snapshotfs.CountingUploadProgress
}
func (s *sourceManager) Status() *serverapi.SourceStatus {
@@ -52,15 +50,16 @@ func (s *sourceManager) Status() *serverapi.SourceStatus {
st := &serverapi.SourceStatus{
Source: s.src,
Status: s.state,
LastSnapshotSize: s.lastSnapshot.Stats.TotalFileSize,
LastSnapshotTime: s.lastSnapshot.StartTime,
NextSnapshotTime: s.nextSnapshotTime,
Policy: s.pol,
SchedulingPolicy: s.pol,
}
st.UploadStatus.UploadingPath = s.uploadPath
st.UploadStatus.UploadingPathCompleted = s.uploadPathCompleted
st.UploadStatus.UploadingPathTotal = s.uploadPathTotal
if st.Status == "SNAPSHOTTING" {
c := s.progress.Snapshot()
st.UploadCounters = &c
}
return st
}
@@ -75,9 +74,14 @@ func (s *sourceManager) run(ctx context.Context) {
s.setStatus("INITIALIZING")
defer s.setStatus("STOPPED")
if s.server.hostname == s.src.Host {
s.wg.Add(1)
defer s.wg.Done()
if s.server.options.Hostname == s.src.Host {
log.Debugf("starting local source manager for %v", s.src)
s.runLocal(ctx)
} else {
log.Debugf("starting remote source manager for %v", s.src)
s.runRemote(ctx)
}
}
@@ -86,13 +90,14 @@ func (s *sourceManager) runLocal(ctx context.Context) {
s.refreshStatus(ctx)
for {
var timeBeforeNextSnapshot time.Duration
var waitTime time.Duration
if !s.nextSnapshotTime.IsZero() {
timeBeforeNextSnapshot = time.Until(s.nextSnapshotTime)
log.Infof("time to next snapshot %v is %v", s.src, timeBeforeNextSnapshot)
if s.nextSnapshotTime != nil {
waitTime = time.Until(*s.nextSnapshotTime)
log.Debugf("time to next snapshot %v is %v", s.src, waitTime)
} else {
timeBeforeNextSnapshot = oneDay
log.Debugf("no scheduled snapshot for %v", s.src)
waitTime = oneDay
}
s.setStatus("WAITING")
@@ -103,8 +108,8 @@ func (s *sourceManager) runLocal(ctx context.Context) {
case <-time.After(statusRefreshInterval):
s.refreshStatus(ctx)
case <-time.After(timeBeforeNextSnapshot):
log.Infof("snapshotting %v", s.src)
case <-time.After(waitTime):
log.Debugf("snapshotting %v", s.src)
s.setStatus("SNAPSHOTTING")
s.snapshot(ctx)
s.refreshStatus(ctx)
@@ -126,25 +131,6 @@ func (s *sourceManager) runRemote(ctx context.Context) {
}
}
func (s *sourceManager) Progress(path string, numFiles int, pathCompleted, pathTotal int64, stats *snapshot.Stats) {
s.mu.Lock()
defer s.mu.Unlock()
s.uploadPath = path
s.uploadPathCompleted = pathCompleted
s.uploadPathTotal = pathTotal
log.Debugf("path: %v %v/%v", path, pathCompleted, pathTotal)
}
func (s *sourceManager) UploadFinished() {
s.mu.Lock()
defer s.mu.Unlock()
s.uploadPath = ""
s.uploadPathCompleted = 0
s.uploadPathTotal = 0
}
func (s *sourceManager) upload() serverapi.SourceActionResponse {
log.Infof("upload triggered via API: %v", s.src)
return serverapi.SourceActionResponse{Success: true}
@@ -165,6 +151,16 @@ func (s *sourceManager) resume() serverapi.SourceActionResponse {
return serverapi.SourceActionResponse{Success: true}
}
func (s *sourceManager) stop() {
log.Debugf("stopping source manager for %v", s.src)
close(s.closed)
}
func (s *sourceManager) waitUntilStopped() {
s.wg.Wait()
log.Debugf("source manager for %v has stopped", s.src)
}
func (s *sourceManager) snapshot(ctx context.Context) {
s.server.beginUpload(s.src)
defer s.server.endUpload(s.src)
@@ -182,7 +178,7 @@ func (s *sourceManager) snapshot(ctx context.Context) {
log.Errorf("unable to create policy getter: %v", err)
}
u.Progress = s
u.Progress = s.progress
log.Infof("starting upload of %v", s.src)
@@ -211,31 +207,26 @@ func (s *sourceManager) snapshot(ctx context.Context) {
}
}
func (s *sourceManager) findClosestNextSnapshotTime() time.Time {
nextSnapshotTime := time.Now().Add(oneDay)
func (s *sourceManager) findClosestNextSnapshotTime() *time.Time {
var nextSnapshotTime *time.Time
if s.pol != nil {
// compute next snapshot time based on interval
if interval := s.pol.SchedulingPolicy.IntervalSeconds; interval != 0 {
interval := time.Duration(interval) * time.Second
nt := s.lastSnapshot.StartTime.Add(interval).Truncate(interval)
// compute next snapshot time based on interval
if interval := s.pol.IntervalSeconds; interval != 0 {
interval := time.Duration(interval) * time.Second
nt := s.lastSnapshot.StartTime.Add(interval).Truncate(interval)
nextSnapshotTime = &nt
}
if nt.Before(nextSnapshotTime) {
nextSnapshotTime = nt
}
for _, tod := range s.pol.TimesOfDay {
nowLocalTime := time.Now().Local()
localSnapshotTime := time.Date(nowLocalTime.Year(), nowLocalTime.Month(), nowLocalTime.Day(), tod.Hour, tod.Minute, 0, 0, time.Local)
if tod.Hour < nowLocalTime.Hour() || (tod.Hour == nowLocalTime.Hour() && tod.Minute < nowLocalTime.Minute()) {
localSnapshotTime = localSnapshotTime.Add(oneDay)
}
for _, tod := range s.pol.SchedulingPolicy.TimesOfDay {
nowLocalTime := time.Now().Local()
localSnapshotTime := time.Date(nowLocalTime.Year(), nowLocalTime.Month(), nowLocalTime.Day(), tod.Hour, tod.Minute, 0, 0, time.Local)
if tod.Hour < nowLocalTime.Hour() || (tod.Hour == nowLocalTime.Hour() && tod.Minute < nowLocalTime.Minute()) {
localSnapshotTime = localSnapshotTime.Add(oneDay)
}
if localSnapshotTime.Before(nextSnapshotTime) {
nextSnapshotTime = localSnapshotTime
}
if nextSnapshotTime == nil || localSnapshotTime.Before(*nextSnapshotTime) {
nextSnapshotTime = &localSnapshotTime
}
}
@@ -251,7 +242,7 @@ func (s *sourceManager) refreshStatus(ctx context.Context) {
return
}
s.pol = pol
s.pol = pol.SchedulingPolicy
snapshots, err := snapshot.ListSnapshots(ctx, s.server.rep, s.src)
if err != nil {
@@ -266,17 +257,18 @@ func (s *sourceManager) refreshStatus(ctx context.Context) {
s.lastSnapshot = snaps[0]
s.nextSnapshotTime = s.findClosestNextSnapshotTime()
} else {
s.nextSnapshotTime = time.Time{}
s.nextSnapshotTime = nil
s.lastSnapshot = nil
}
}
func newSourceManager(src snapshot.SourceInfo, server *Server) *sourceManager {
m := &sourceManager{
src: src,
server: server,
state: "UNKNOWN",
closed: make(chan struct{}),
src: src,
server: server,
state: "UNKNOWN",
closed: make(chan struct{}),
progress: &snapshotfs.CountingUploadProgress{},
}
return m

View File

@@ -9,9 +9,12 @@
"encoding/json"
"net/http"
"github.com/kopia/kopia/internal/kopialogging"
"github.com/pkg/errors"
)
var log = kopialogging.Logger("kopia/client")
// DefaultUsername is the default username for Kopia server.
const DefaultUsername = "kopia"
@@ -27,6 +30,10 @@ func (c *Client) Get(path string, respPayload interface{}) error {
return err
}
if c.options.LogRequests {
log.Debugf("GET %v", c.options.BaseURL+path)
}
if c.options.Username != "" {
req.SetBasicAuth(c.options.Username, c.options.Password)
}
@@ -56,6 +63,10 @@ func (c *Client) Post(path string, reqPayload, respPayload interface{}) error {
return errors.Wrap(err, "unable to encode request")
}
if c.options.LogRequests {
log.Infof("POST %v (%v bytes)", c.options.BaseURL+path, buf.Len())
}
req, err := http.NewRequest("POST", c.options.BaseURL+path, &buf)
if err != nil {
return err
@@ -97,6 +108,8 @@ type ClientOptions struct {
TrustedServerCertificateFingerprint string
RootCAs *x509.CertPool
LogRequests bool
}
// NewClient creates a options.HTTPClient for connecting to Kopia HTTP API.

View File

@@ -0,0 +1,37 @@
package serverapi
import "context"
func (c *Client) CreateRepository(ctx context.Context, req *CreateRequest) error {
return c.Post("repo/create", req, &StatusResponse{})
}
func (c *Client) ConnectToRepository(ctx context.Context, req *ConnectRequest) error {
return c.Post("repo/connect", req, &StatusResponse{})
}
func (c *Client) DisconnectFromRepository(ctx context.Context) error {
return c.Post("repo/disconnect", &Empty{}, &Empty{})
}
func (c *Client) Shutdown(ctx context.Context) {
_ = c.Post("shutdown", &Empty{}, &Empty{})
}
func (c *Client) Status(ctx context.Context) (*StatusResponse, error) {
resp := &StatusResponse{}
if err := c.Get("repo/status", resp); err != nil {
return nil, err
}
return resp, nil
}
func (c *Client) Sources(ctx context.Context) (*SourcesResponse, error) {
resp := &SourcesResponse{}
if err := c.Get("sources", resp); err != nil {
return nil, err
}
return resp, nil
}

View File

@@ -4,17 +4,24 @@
import (
"time"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
// StatusResponse is the response of 'status' HTTP API command.
type StatusResponse struct {
ConfigFile string `json:"configFile"`
CacheDir string `json:"cacheDir"`
BlockFormatting content.FormattingOptions `json:"blockFormatting"`
Storage string `json:"storage"`
Connected bool `json:"connected"`
ConfigFile string `json:"configFile,omitempty"`
CacheDir string `json:"cacheDir,omitempty"`
Hash string `json:"hash,omitempty"`
Encryption string `json:"encryption,omitempty"`
Splitter string `json:"splitter,omitempty"`
MaxPackSize int `json:"maxPackSize,omitempty"`
Storage string `json:"storage,omitempty"`
}
// SourcesResponse is the response of 'sources' HTTP API command.
@@ -24,18 +31,12 @@ type SourcesResponse struct {
// SourceStatus describes the status of a single source.
type SourceStatus struct {
Source snapshot.SourceInfo `json:"source"`
Status string `json:"status"`
Policy *policy.Policy `json:"policy"`
LastSnapshotSize int64 `json:"lastSnapshotSize,omitempty"`
LastSnapshotTime time.Time `json:"lastSnapshotTime,omitempty"`
NextSnapshotTime time.Time `json:"nextSnapshotTime,omitempty"`
UploadStatus struct {
UploadingPath string `json:"path,omitempty"`
UploadingPathCompleted int64 `json:"pathCompleted,omitempty"`
UploadingPathTotal int64 `json:"pathTotal,omitempty"`
} `json:"upload"`
Source snapshot.SourceInfo `json:"source"`
Status string `json:"status"`
SchedulingPolicy policy.SchedulingPolicy `json:"schedule"`
LastSnapshotTime time.Time `json:"lastSnapshotTime,omitempty"`
NextSnapshotTime *time.Time `json:"nextSnapshotTime,omitempty"`
UploadCounters *snapshotfs.UploadCounters `json:"upload,omitempty"`
}
// PolicyListEntry describes single policy.
@@ -54,6 +55,11 @@ type PoliciesResponse struct {
type Empty struct {
}
// ErrorResponse represents error response.
type ErrorResponse struct {
Error string `json:"error"`
}
// SourceActionResponse is a per-source response.
type SourceActionResponse struct {
Success bool `json:"success"`
@@ -63,3 +69,18 @@ type SourceActionResponse struct {
type MultipleSourceActionResponse struct {
Sources map[string]SourceActionResponse `json:"sources"`
}
// CreateRequest contains request to create a repository in a given storage
type CreateRequest struct {
Storage blob.ConnectionInfo `json:"storage"`
Password string `json:"password"`
CacheOptions content.CachingOptions `json:"cacheOptions"`
NewRepositoryOptions repo.NewRepositoryOptions `json:"options"`
}
// ConnectRequest contains request to connect to a repository.
type ConnectRequest struct {
Storage blob.ConnectionInfo `json:"storage"`
Password string `json:"password"`
CacheOptions content.CachingOptions `json:"cacheOptions"`
}

View File

@@ -27,10 +27,10 @@
// NewRepositoryOptions specifies options that apply to newly created repositories.
// All fields are optional, when not provided, reasonable defaults will be used.
type NewRepositoryOptions struct {
UniqueID []byte // force the use of particular unique ID
BlockFormat content.FormattingOptions
DisableHMAC bool
ObjectFormat object.Format // object format
UniqueID []byte `json:"uniqueID"` // force the use of particular unique ID
BlockFormat content.FormattingOptions `json:"blockFormat"`
DisableHMAC bool `json:"disableHMAC"`
ObjectFormat object.Format `json:"objectFormat"` // object format
}
// Initialize creates initial repository data structures in the specified storage with given credentials.

View File

@@ -28,8 +28,8 @@ type Repository struct {
// Close closes the repository and releases all resources.
func (r *Repository) Close(ctx context.Context) error {
if err := r.Manifests.Flush(ctx); err != nil {
return errors.Wrap(err, "error flushing manifests")
if err := r.Flush(ctx); err != nil {
return errors.Wrap(err, "error flushing")
}
if err := r.Content.Close(ctx); err != nil {

View File

@@ -1,11 +1,14 @@
package endtoend_test
import (
"context"
"strings"
"testing"
"time"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/filesystem"
"github.com/kopia/kopia/tests/testenv"
)
@@ -42,20 +45,74 @@ func (s *serverParameters) ProcessOutput(l string) bool {
func TestServerStart(t *testing.T) {
t.Parallel()
ctx := context.Background()
e := testenv.NewCLITest(t)
defer e.Cleanup(t)
defer e.RunAndExpectSuccess(t, "repo", "disconnect")
e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir)
// Start the server and wait for it to auto-shutdown in 3 seconds.
// If this does not work, we avoid starting a longer test.
e.RunAndExpectSuccess(t, "server", "start", "--ui", "--auto-shutdown=3s")
time.Sleep(3 * time.Second)
e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1)
e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1)
var sp serverParameters
e.RunAndProcessStderr(t, sp.ProcessOutput, "server", "start", "--ui", "--random-password", "--tls-generate-cert", "--auto-shutdown=60s")
e.RunAndProcessStderr(t, sp.ProcessOutput, "server", "start", "--ui", "--address=localhost:0", "--random-password", "--tls-generate-cert", "--auto-shutdown=10s")
t.Logf("detected server parameters %#v", sp)
cli, err := serverapi.NewClient(serverapi.ClientOptions{
BaseURL: sp.baseURL,
Password: sp.password,
TrustedServerCertificateFingerprint: sp.sha256Fingerprint,
LogRequests: true,
})
if err != nil {
t.Fatalf("unable to create API client")
}
defer cli.Shutdown(ctx) // nolint:errcheck
time.Sleep(1 * time.Second)
st := verifyServerConnected(t, cli, true)
if got, want := st.Storage, "filesystem"; got != want {
t.Errorf("unexpected storage type: %v, want %v", got, want)
}
sources, err := cli.Sources(ctx)
if err != nil {
t.Fatalf("error listing sources: %v", err)
}
if got, want := len(sources.Sources), 1; got != want {
t.Errorf("unexpected number of sources %v, want %v", got, want)
}
if got, want := sources.Sources[0].Source.Path, sharedTestDataDir1; got != want {
t.Errorf("unexpected source path: %v, want %v", got, want)
}
}
func TestServerStartWithoutInitialRepository(t *testing.T) {
t.Parallel()
ctx := context.Background()
e := testenv.NewCLITest(t)
defer e.Cleanup(t)
defer e.RunAndExpectSuccess(t, "repo", "disconnect")
var sp serverParameters
connInfo := blob.ConnectionInfo{
Type: "filesystem",
Config: filesystem.Options{
Path: e.RepoDir,
},
}
e.RunAndProcessStderr(t, sp.ProcessOutput, "server", "start", "--ui", "--address=localhost:0", "--random-password", "--tls-generate-cert", "--auto-shutdown=60s")
t.Logf("detected server parameters %#v", sp)
cli, err := serverapi.NewClient(serverapi.ClientOptions{
@@ -67,16 +124,48 @@ func TestServerStart(t *testing.T) {
t.Fatalf("unable to create API client")
}
defer cli.Shutdown(ctx) // nolint:errcheck
time.Sleep(1 * time.Second)
if err := cli.Get("status", &serverapi.Empty{}); err != nil {
t.Errorf("status error: %v", err)
verifyServerConnected(t, cli, false)
if err = cli.CreateRepository(ctx, &serverapi.CreateRequest{
Password: "foofoo",
Storage: connInfo,
}); err != nil {
t.Fatalf("create error: %v", err)
}
// TODO - add more tests
verifyServerConnected(t, cli, true)
// explicit shutdown
if err := cli.Post("shutdown", &serverapi.Empty{}, &serverapi.Empty{}); err != nil {
t.Logf("expected shutdown error: %v", err)
if err = cli.DisconnectFromRepository(ctx); err != nil {
t.Fatalf("disconnect error: %v", err)
}
verifyServerConnected(t, cli, false)
if err = cli.ConnectToRepository(ctx, &serverapi.ConnectRequest{
Password: "foofoo",
Storage: connInfo,
}); err != nil {
t.Fatalf("create error: %v", err)
}
verifyServerConnected(t, cli, true)
}
func verifyServerConnected(t *testing.T, cli *serverapi.Client, want bool) *serverapi.StatusResponse {
t.Helper()
st, err := cli.Status(context.Background())
if err != nil {
t.Fatalf("status error: %v", err)
}
if got := st.Connected; got != want {
t.Errorf("invalid status connected %v, want %v", st.Connected, want)
}
return st
}