server: new APIs and error codes to support UI flow for connecting to repository

This commit is contained in:
Jarek Kowalski
2020-02-15 12:51:28 -08:00
parent cf4f79a4d1
commit 4cb898927c
6 changed files with 195 additions and 48 deletions

View File

@@ -2,17 +2,20 @@
import (
"fmt"
"github.com/kopia/kopia/internal/serverapi"
)
type apiError struct {
code int
message string
httpErrorCode int
apiErrorCode serverapi.APIErrorCode
message string
}
func requestError(message string) *apiError {
return &apiError{400, message}
func requestError(apiErrorCode serverapi.APIErrorCode, message string) *apiError {
return &apiError{400, apiErrorCode, message}
}
func internalServerError(err error) *apiError {
return &apiError{500, fmt.Sprintf("internal server error: %v", err)}
return &apiError{500, serverapi.ErrorInternal, fmt.Sprintf("internal server error: %v", err)}
}

View File

@@ -10,6 +10,8 @@
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/object"
)
func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) {
@@ -31,25 +33,45 @@ func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interfa
}, nil
}
func maybeDecodeToken(req *serverapi.ConnectRequest) *apiError {
if req.Token != "" {
ci, password, err := repo.DecodeToken(req.Token)
if err != nil {
return requestError(serverapi.ErrorInvalidToken, "invalid token: "+err.Error())
}
req.Storage = ci
if password != "" {
req.Password = password
}
}
return nil
}
func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if s.rep != nil {
return nil, requestError("already connected")
return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected")
}
var req serverapi.CreateRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, requestError("unable to decode request: " + err.Error())
return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error())
}
if err := maybeDecodeToken(&req.ConnectRequest); err != nil {
return nil, err
}
st, err := blob.NewStorage(ctx, req.Storage)
if err != nil {
return nil, internalServerError(errors.Wrap(err, "unable to connect to storage"))
return nil, requestError(serverapi.ErrorStorageConnection, "unable to connect to storage: "+err.Error())
}
defer st.Close(ctx) //nolint:errcheck
if err := repo.Initialize(ctx, st, &req.NewRepositoryOptions, req.Password); err != nil {
return nil, internalServerError(errors.Wrap(err, "unable to initialize repository"))
return nil, repoErrorToAPIError(err)
}
return s.connectAndOpen(ctx, req.Storage, req.Password)
@@ -57,32 +79,51 @@ func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interfa
func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if s.rep != nil {
return nil, requestError("already connected")
return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected")
}
var req serverapi.ConnectRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, requestError("unable to decode request: " + err.Error())
return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error())
}
if err := maybeDecodeToken(&req); err != nil {
return nil, err
}
return s.connectAndOpen(ctx, req.Storage, req.Password)
}
func (s *Server) handleRepoSupportedAlgorithms(ctx context.Context, r *http.Request) (interface{}, *apiError) {
res := &serverapi.SupportedAlgorithmsResponse{
DefaultHashAlgorithm: content.DefaultHash,
HashAlgorithms: content.SupportedHashAlgorithms(),
DefaultEncryptionAlgorithm: content.DefaultEncryption,
EncryptionAlgorithms: content.SupportedEncryptionAlgorithms(),
DefaultSplitterAlgorithm: object.DefaultSplitter,
SplitterAlgorithms: object.SupportedSplitters,
}
return res, nil
}
func (s *Server) connectAndOpen(ctx context.Context, conn blob.ConnectionInfo, password string) (interface{}, *apiError) {
st, err := blob.NewStorage(ctx, conn)
if err != nil {
return nil, internalServerError(errors.Wrap(err, "can't open storage"))
return nil, requestError(serverapi.ErrorStorageConnection, "can't open storage: "+err.Error())
}
defer st.Close(ctx) //nolint:errcheck
if err = repo.Connect(ctx, s.options.ConfigFile, st, password, s.options.ConnectOptions); err != nil {
return nil, internalServerError(errors.Wrap(err, "connect error"))
return nil, repoErrorToAPIError(err)
}
rep, err := repo.Open(ctx, s.options.ConfigFile, password, nil)
if err != nil {
return nil, internalServerError(errors.Wrap(err, "open error"))
return nil, repoErrorToAPIError(err)
}
// release shared lock so that SetRepository can acquire exclusive lock
@@ -99,14 +140,6 @@ func (s *Server) connectAndOpen(ctx context.Context, conn blob.ConnectionInfo, p
}
func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if s.rep == nil {
return nil, requestError("already disconnected")
}
if err := repo.Disconnect(s.options.ConfigFile); err != nil {
return nil, internalServerError(err)
}
// release shared lock so that SetRepository can acquire exclusive lock
s.mu.RUnlock()
err := s.SetRepository(ctx, nil)
@@ -116,13 +149,38 @@ func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (int
return nil, internalServerError(err)
}
if err := repo.Disconnect(s.options.ConfigFile); err != nil {
return nil, internalServerError(err)
}
return &serverapi.Empty{}, nil
}
func (s *Server) handleRepoLock(ctx context.Context, r *http.Request) (interface{}, *apiError) {
return nil, &apiError{code: http.StatusNotImplemented}
func (s *Server) handleRepoSync(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if err := s.rep.Refresh(ctx); err != nil {
return nil, internalServerError(errors.Wrap(err, "unable to refresh repository"))
}
// release shared lock so that SyncSources can acquire exclusive lock
s.mu.RUnlock()
err := s.SyncSources(ctx)
s.mu.RLock()
if err != nil {
return nil, internalServerError(errors.Wrap(err, "unable to sync sources"))
}
return &serverapi.Empty{}, nil
}
func (s *Server) handleRepoUnlock(ctx context.Context, r *http.Request) (interface{}, *apiError) {
return nil, &apiError{code: http.StatusNotImplemented}
func repoErrorToAPIError(err error) *apiError {
switch err {
case repo.ErrRepositoryNotInitialized:
return requestError(serverapi.ErrorNotInitialized, "repository not initialized")
case repo.ErrInvalidPassword:
return requestError(serverapi.ErrorInvalidPassword, "invalid password")
case repo.ErrAlreadyInitialized:
return requestError(serverapi.ErrorAlreadyInitialized, "repository already initialized")
default:
return internalServerError(errors.Wrap(err, "connect error"))
}
}

View File

@@ -44,7 +44,7 @@ func (s *Server) APIHandlers() http.Handler {
mux.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh, "POST"))
mux.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush, "POST"))
mux.HandleFunc("/api/v1/shutdown", s.handleAPI(s.handleShutdown, "POST"))
mux.HandleFunc("/api/v1/shutdown", s.handleAPIPossiblyNotConnected(s.handleShutdown, "POST"))
mux.HandleFunc("/api/v1/sources/pause", s.handleAPI(s.handlePause, "POST"))
mux.HandleFunc("/api/v1/sources/resume", s.handleAPI(s.handleResume, "POST"))
@@ -53,21 +53,33 @@ func (s *Server) APIHandlers() http.Handler {
mux.HandleFunc("/api/v1/objects/", s.handleObjectGet)
mux.HandleFunc("/api/v1/repo/status", s.handleAPI(s.handleRepoStatus, "GET"))
mux.HandleFunc("/api/v1/repo/connect", s.handleAPI(s.handleRepoConnect, "POST"))
mux.HandleFunc("/api/v1/repo/create", s.handleAPI(s.handleRepoCreate, "POST"))
mux.HandleFunc("/api/v1/repo/status", s.handleAPIPossiblyNotConnected(s.handleRepoStatus, "GET"))
mux.HandleFunc("/api/v1/repo/connect", s.handleAPIPossiblyNotConnected(s.handleRepoConnect, "POST"))
mux.HandleFunc("/api/v1/repo/create", s.handleAPIPossiblyNotConnected(s.handleRepoCreate, "POST"))
mux.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect, "POST"))
mux.HandleFunc("/api/v1/repo/lock", s.handleAPI(s.handleRepoLock, "POST"))
mux.HandleFunc("/api/v1/repo/unlock", s.handleAPI(s.handleRepoUnlock, "POST"))
mux.HandleFunc("/api/v1/repo/algorithms", s.handleAPIPossiblyNotConnected(s.handleRepoSupportedAlgorithms, "GET"))
mux.HandleFunc("/api/v1/repo/sync", s.handleAPI(s.handleRepoSync, "POST"))
return mux
}
func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError), httpMethod string) http.HandlerFunc {
return s.handleAPIPossiblyNotConnected(func(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if s.rep == nil {
return nil, requestError(serverapi.ErrorNotConnected, "not connected")
}
return f(ctx, r)
}, httpMethod)
}
func (s *Server) handleAPIPossiblyNotConnected(f func(ctx context.Context, r *http.Request) (interface{}, *apiError), httpMethod string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
s.mu.RLock()
defer s.mu.RUnlock()
log.Debug("request %v", r.URL)
if r.Method != httpMethod {
http.Error(w, "incompatible HTTP method", http.StatusMethodNotAllowed)
return
@@ -89,9 +101,11 @@ func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interfa
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(err.code)
w.WriteHeader(err.httpErrorCode)
log.Debug("error code %v message %v", err.apiErrorCode, err.message)
_ = e.Encode(&serverapi.ErrorResponse{
Code: err.apiErrorCode,
Error: err.message,
})
}
@@ -154,14 +168,14 @@ func (s *Server) handleCancel(ctx context.Context, r *http.Request) (interface{}
}
func (s *Server) beginUpload(src snapshot.SourceInfo) {
log.Infof("waiting on semaphore to upload %v", src)
log.Debugf("waiting on semaphore to upload %v", src)
s.uploadSemaphore <- struct{}{}
log.Infof("entered semaphore to upload %v", src)
log.Debugf("entered semaphore to upload %v", src)
}
func (s *Server) endUpload(src snapshot.SourceInfo) {
log.Infof("finished uploading %v", src)
log.Debugf("finished uploading %v", src)
<-s.uploadSemaphore
}
@@ -178,7 +192,9 @@ func (s *Server) SetRepository(ctx context.Context, rep *repo.Repository) error
if s.rep != nil {
// close previous source managers
log.Infof("stopping all source managers")
s.stopAllSourceManagersLocked()
log.Infof("stopped all source managers")
if err := s.rep.Close(ctx); err != nil {
return errors.Wrap(err, "unable to close previous repository")
@@ -220,10 +236,22 @@ func (s *Server) refreshPeriodically(ctx context.Context, r *repo.Repository) {
if err := r.Refresh(ctx); err != nil {
log.Warningf("error refreshing repository: %v", err)
}
if err := s.syncSourcesLocked(ctx); err != nil {
log.Warningf("unable to sync sources: %v", err)
}
}
}
}
// SyncSources synchronizes the repository and source managers.
func (s *Server) SyncSources(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.syncSourcesLocked(ctx)
}
// StopAllSourceManagers causes all source managers to stop.
func (s *Server) StopAllSourceManagers() {
s.mu.Lock()

View File

@@ -34,6 +34,7 @@ type sourceManager struct {
wg sync.WaitGroup
mu sync.RWMutex
uploader *snapshotfs.Uploader
pol policy.SchedulingPolicy
state string
nextSnapshotTime *time.Time
@@ -66,8 +67,22 @@ func (s *sourceManager) Status() *serverapi.SourceStatus {
func (s *sourceManager) setStatus(stat string) {
s.mu.Lock()
defer s.mu.Unlock()
s.state = stat
s.mu.Unlock()
}
func (s *sourceManager) currentUploader() *snapshotfs.Uploader {
s.mu.Lock()
defer s.mu.Unlock()
return s.uploader
}
func (s *sourceManager) setUploader(u *snapshotfs.Uploader) {
s.mu.Lock()
defer s.mu.Unlock()
s.uploader = u
}
func (s *sourceManager) run(ctx context.Context) {
@@ -153,6 +168,12 @@ func (s *sourceManager) resume() serverapi.SourceActionResponse {
func (s *sourceManager) stop() {
log.Debugf("stopping source manager for %v", s.src)
if u := s.currentUploader(); u != nil {
log.Infof("canceling current upload")
u.Cancel()
}
close(s.closed)
}
@@ -165,6 +186,15 @@ func (s *sourceManager) snapshot(ctx context.Context) {
s.server.beginUpload(s.src)
defer s.server.endUpload(s.src)
// check if we got closed while waiting on semaphore
select {
case <-s.closed:
log.Infof("not snapshotting %v because source manager is shutting down", s.src)
return
default:
}
localEntry, err := localfs.NewEntry(s.src.Path)
if err != nil {
log.Errorf("unable to create local filesystem: %v", err)
@@ -181,8 +211,10 @@ func (s *sourceManager) snapshot(ctx context.Context) {
u.Progress = s.progress
log.Infof("starting upload of %v", s.src)
s.setUploader(u)
manifest, err := u.Upload(ctx, localEntry, policyTree, s.src, s.lastCompleteSnapshot, s.lastSnapshot)
s.setUploader(nil)
if err != nil {
log.Errorf("upload error: %v", err)
return

View File

@@ -6,7 +6,6 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
@@ -55,9 +54,26 @@ type PoliciesResponse struct {
type Empty struct {
}
// APIErrorCode indicates machine-readable error code returned in API responses.
type APIErrorCode string
// Supported error codes.
const (
ErrorInternal APIErrorCode = "INTERNAL"
ErrorMalformedRequest APIErrorCode = "MALFORMED_REQUEST"
ErrorInvalidToken APIErrorCode = "INVALID_TOKEN"
ErrorStorageConnection APIErrorCode = "STORAGE_CONNECTION"
ErrorAlreadyInitialized APIErrorCode = "ALREADY_INITIALIZED"
ErrorNotInitialized APIErrorCode = "NOT_INITIALIZED"
ErrorAlreadyConnected APIErrorCode = "ALREADY_CONNECTED"
ErrorNotConnected APIErrorCode = "NOT_CONNECTED"
ErrorInvalidPassword APIErrorCode = "INVALID_PASSWORD"
)
// ErrorResponse represents error response.
type ErrorResponse struct {
Error string `json:"error"`
Code APIErrorCode `json:"code"`
Error string `json:"error"`
}
// SourceActionResponse is a per-source response.
@@ -72,15 +88,23 @@ type MultipleSourceActionResponse struct {
// CreateRequest contains request to create a repository in a given storage
type CreateRequest struct {
Storage blob.ConnectionInfo `json:"storage"`
Password string `json:"password"`
CacheOptions content.CachingOptions `json:"cacheOptions"`
ConnectRequest
NewRepositoryOptions repo.NewRepositoryOptions `json:"options"`
}
// ConnectRequest contains request to connect to a repository.
type ConnectRequest struct {
Storage blob.ConnectionInfo `json:"storage"`
Password string `json:"password"`
CacheOptions content.CachingOptions `json:"cacheOptions"`
Storage blob.ConnectionInfo `json:"storage"`
Password string `json:"password"`
Token string `json:"token"` // when set, overrides Storage and Password
}
// SupportedAlgorithmsResponse returns the list of supported algorithms for repository creation.
type SupportedAlgorithmsResponse struct {
DefaultHashAlgorithm string `json:"defaultHash"`
DefaultEncryptionAlgorithm string `json:"defaultEncryption"`
DefaultSplitterAlgorithm string `json:"defaultSplitter"`
HashAlgorithms []string `json:"hash"`
EncryptionAlgorithms []string `json:"encryption"`
SplitterAlgorithms []string `json:"splitter"`
}

View File

@@ -130,8 +130,10 @@ func TestServerStartWithoutInitialRepository(t *testing.T) {
verifyServerConnected(t, cli, false)
if err = cli.CreateRepository(ctx, &serverapi.CreateRequest{
Password: "foofoo",
Storage: connInfo,
ConnectRequest: serverapi.ConnectRequest{
Password: "foofoo",
Storage: connInfo,
},
}); err != nil {
t.Fatalf("create error: %v", err)
}