From 4cb898927c5c1f52b2d05caccfd683eb8469a401 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 15 Feb 2020 12:51:28 -0800 Subject: [PATCH] server: new APIs and error codes to support UI flow for connecting to repository --- internal/server/api_error.go | 13 +-- internal/server/api_repo.go | 100 ++++++++++++++++----- internal/server/server.go | 48 +++++++--- internal/server/source_manager.go | 36 +++++++- internal/serverapi/serverapi.go | 40 +++++++-- tests/end_to_end_test/server_start_test.go | 6 +- 6 files changed, 195 insertions(+), 48 deletions(-) diff --git a/internal/server/api_error.go b/internal/server/api_error.go index 1c3941eb1..8882c9f7d 100644 --- a/internal/server/api_error.go +++ b/internal/server/api_error.go @@ -2,17 +2,20 @@ import ( "fmt" + + "github.com/kopia/kopia/internal/serverapi" ) type apiError struct { - code int - message string + httpErrorCode int + apiErrorCode serverapi.APIErrorCode + message string } -func requestError(message string) *apiError { - return &apiError{400, message} +func requestError(apiErrorCode serverapi.APIErrorCode, message string) *apiError { + return &apiError{400, apiErrorCode, message} } func internalServerError(err error) *apiError { - return &apiError{500, fmt.Sprintf("internal server error: %v", err)} + return &apiError{500, serverapi.ErrorInternal, fmt.Sprintf("internal server error: %v", err)} } diff --git a/internal/server/api_repo.go b/internal/server/api_repo.go index cbf306942..13f791430 100644 --- a/internal/server/api_repo.go +++ b/internal/server/api_repo.go @@ -10,6 +10,8 @@ "github.com/kopia/kopia/internal/serverapi" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/object" ) func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) { @@ -31,25 +33,45 @@ func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interfa }, nil } +func maybeDecodeToken(req *serverapi.ConnectRequest) *apiError { + if req.Token != "" { + ci, password, err := repo.DecodeToken(req.Token) + if err != nil { + return requestError(serverapi.ErrorInvalidToken, "invalid token: "+err.Error()) + } + + req.Storage = ci + if password != "" { + req.Password = password + } + } + + return nil +} + func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) { if s.rep != nil { - return nil, requestError("already connected") + return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected") } var req serverapi.CreateRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - return nil, requestError("unable to decode request: " + err.Error()) + return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error()) + } + + if err := maybeDecodeToken(&req.ConnectRequest); err != nil { + return nil, err } st, err := blob.NewStorage(ctx, req.Storage) if err != nil { - return nil, internalServerError(errors.Wrap(err, "unable to connect to storage")) + return nil, requestError(serverapi.ErrorStorageConnection, "unable to connect to storage: "+err.Error()) } defer st.Close(ctx) //nolint:errcheck if err := repo.Initialize(ctx, st, &req.NewRepositoryOptions, req.Password); err != nil { - return nil, internalServerError(errors.Wrap(err, "unable to initialize repository")) + return nil, repoErrorToAPIError(err) } return s.connectAndOpen(ctx, req.Storage, req.Password) @@ -57,32 +79,51 @@ func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interfa func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interface{}, *apiError) { if s.rep != nil { - return nil, requestError("already connected") + return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected") } var req serverapi.ConnectRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - return nil, requestError("unable to decode request: " + err.Error()) + return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error()) + } + + if err := maybeDecodeToken(&req); err != nil { + return nil, err } return s.connectAndOpen(ctx, req.Storage, req.Password) } +func (s *Server) handleRepoSupportedAlgorithms(ctx context.Context, r *http.Request) (interface{}, *apiError) { + res := &serverapi.SupportedAlgorithmsResponse{ + DefaultHashAlgorithm: content.DefaultHash, + HashAlgorithms: content.SupportedHashAlgorithms(), + + DefaultEncryptionAlgorithm: content.DefaultEncryption, + EncryptionAlgorithms: content.SupportedEncryptionAlgorithms(), + + DefaultSplitterAlgorithm: object.DefaultSplitter, + SplitterAlgorithms: object.SupportedSplitters, + } + + return res, nil +} + func (s *Server) connectAndOpen(ctx context.Context, conn blob.ConnectionInfo, password string) (interface{}, *apiError) { st, err := blob.NewStorage(ctx, conn) if err != nil { - return nil, internalServerError(errors.Wrap(err, "can't open storage")) + return nil, requestError(serverapi.ErrorStorageConnection, "can't open storage: "+err.Error()) } defer st.Close(ctx) //nolint:errcheck if err = repo.Connect(ctx, s.options.ConfigFile, st, password, s.options.ConnectOptions); err != nil { - return nil, internalServerError(errors.Wrap(err, "connect error")) + return nil, repoErrorToAPIError(err) } rep, err := repo.Open(ctx, s.options.ConfigFile, password, nil) if err != nil { - return nil, internalServerError(errors.Wrap(err, "open error")) + return nil, repoErrorToAPIError(err) } // release shared lock so that SetRepository can acquire exclusive lock @@ -99,14 +140,6 @@ func (s *Server) connectAndOpen(ctx context.Context, conn blob.ConnectionInfo, p } func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (interface{}, *apiError) { - if s.rep == nil { - return nil, requestError("already disconnected") - } - - if err := repo.Disconnect(s.options.ConfigFile); err != nil { - return nil, internalServerError(err) - } - // release shared lock so that SetRepository can acquire exclusive lock s.mu.RUnlock() err := s.SetRepository(ctx, nil) @@ -116,13 +149,38 @@ func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (int return nil, internalServerError(err) } + if err := repo.Disconnect(s.options.ConfigFile); err != nil { + return nil, internalServerError(err) + } + return &serverapi.Empty{}, nil } -func (s *Server) handleRepoLock(ctx context.Context, r *http.Request) (interface{}, *apiError) { - return nil, &apiError{code: http.StatusNotImplemented} +func (s *Server) handleRepoSync(ctx context.Context, r *http.Request) (interface{}, *apiError) { + if err := s.rep.Refresh(ctx); err != nil { + return nil, internalServerError(errors.Wrap(err, "unable to refresh repository")) + } + + // release shared lock so that SyncSources can acquire exclusive lock + s.mu.RUnlock() + err := s.SyncSources(ctx) + s.mu.RLock() + if err != nil { + return nil, internalServerError(errors.Wrap(err, "unable to sync sources")) + } + + return &serverapi.Empty{}, nil } -func (s *Server) handleRepoUnlock(ctx context.Context, r *http.Request) (interface{}, *apiError) { - return nil, &apiError{code: http.StatusNotImplemented} +func repoErrorToAPIError(err error) *apiError { + switch err { + case repo.ErrRepositoryNotInitialized: + return requestError(serverapi.ErrorNotInitialized, "repository not initialized") + case repo.ErrInvalidPassword: + return requestError(serverapi.ErrorInvalidPassword, "invalid password") + case repo.ErrAlreadyInitialized: + return requestError(serverapi.ErrorAlreadyInitialized, "repository already initialized") + default: + return internalServerError(errors.Wrap(err, "connect error")) + } } diff --git a/internal/server/server.go b/internal/server/server.go index ad6a61da8..b8a078cdf 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -44,7 +44,7 @@ func (s *Server) APIHandlers() http.Handler { mux.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh, "POST")) mux.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush, "POST")) - mux.HandleFunc("/api/v1/shutdown", s.handleAPI(s.handleShutdown, "POST")) + mux.HandleFunc("/api/v1/shutdown", s.handleAPIPossiblyNotConnected(s.handleShutdown, "POST")) mux.HandleFunc("/api/v1/sources/pause", s.handleAPI(s.handlePause, "POST")) mux.HandleFunc("/api/v1/sources/resume", s.handleAPI(s.handleResume, "POST")) @@ -53,21 +53,33 @@ func (s *Server) APIHandlers() http.Handler { mux.HandleFunc("/api/v1/objects/", s.handleObjectGet) - mux.HandleFunc("/api/v1/repo/status", s.handleAPI(s.handleRepoStatus, "GET")) - mux.HandleFunc("/api/v1/repo/connect", s.handleAPI(s.handleRepoConnect, "POST")) - mux.HandleFunc("/api/v1/repo/create", s.handleAPI(s.handleRepoCreate, "POST")) + mux.HandleFunc("/api/v1/repo/status", s.handleAPIPossiblyNotConnected(s.handleRepoStatus, "GET")) + mux.HandleFunc("/api/v1/repo/connect", s.handleAPIPossiblyNotConnected(s.handleRepoConnect, "POST")) + mux.HandleFunc("/api/v1/repo/create", s.handleAPIPossiblyNotConnected(s.handleRepoCreate, "POST")) mux.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect, "POST")) - mux.HandleFunc("/api/v1/repo/lock", s.handleAPI(s.handleRepoLock, "POST")) - mux.HandleFunc("/api/v1/repo/unlock", s.handleAPI(s.handleRepoUnlock, "POST")) + mux.HandleFunc("/api/v1/repo/algorithms", s.handleAPIPossiblyNotConnected(s.handleRepoSupportedAlgorithms, "GET")) + mux.HandleFunc("/api/v1/repo/sync", s.handleAPI(s.handleRepoSync, "POST")) return mux } func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError), httpMethod string) http.HandlerFunc { + return s.handleAPIPossiblyNotConnected(func(ctx context.Context, r *http.Request) (interface{}, *apiError) { + if s.rep == nil { + return nil, requestError(serverapi.ErrorNotConnected, "not connected") + } + + return f(ctx, r) + }, httpMethod) +} + +func (s *Server) handleAPIPossiblyNotConnected(f func(ctx context.Context, r *http.Request) (interface{}, *apiError), httpMethod string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { s.mu.RLock() defer s.mu.RUnlock() + log.Debug("request %v", r.URL) + if r.Method != httpMethod { http.Error(w, "incompatible HTTP method", http.StatusMethodNotAllowed) return @@ -89,9 +101,11 @@ func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interfa w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("X-Content-Type-Options", "nosniff") - w.WriteHeader(err.code) + w.WriteHeader(err.httpErrorCode) + log.Debug("error code %v message %v", err.apiErrorCode, err.message) _ = e.Encode(&serverapi.ErrorResponse{ + Code: err.apiErrorCode, Error: err.message, }) } @@ -154,14 +168,14 @@ func (s *Server) handleCancel(ctx context.Context, r *http.Request) (interface{} } func (s *Server) beginUpload(src snapshot.SourceInfo) { - log.Infof("waiting on semaphore to upload %v", src) + log.Debugf("waiting on semaphore to upload %v", src) s.uploadSemaphore <- struct{}{} - log.Infof("entered semaphore to upload %v", src) + log.Debugf("entered semaphore to upload %v", src) } func (s *Server) endUpload(src snapshot.SourceInfo) { - log.Infof("finished uploading %v", src) + log.Debugf("finished uploading %v", src) <-s.uploadSemaphore } @@ -178,7 +192,9 @@ func (s *Server) SetRepository(ctx context.Context, rep *repo.Repository) error if s.rep != nil { // close previous source managers + log.Infof("stopping all source managers") s.stopAllSourceManagersLocked() + log.Infof("stopped all source managers") if err := s.rep.Close(ctx); err != nil { return errors.Wrap(err, "unable to close previous repository") @@ -220,10 +236,22 @@ func (s *Server) refreshPeriodically(ctx context.Context, r *repo.Repository) { if err := r.Refresh(ctx); err != nil { log.Warningf("error refreshing repository: %v", err) } + + if err := s.syncSourcesLocked(ctx); err != nil { + log.Warningf("unable to sync sources: %v", err) + } } } } +// SyncSources synchronizes the repository and source managers. +func (s *Server) SyncSources(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.syncSourcesLocked(ctx) +} + // StopAllSourceManagers causes all source managers to stop. func (s *Server) StopAllSourceManagers() { s.mu.Lock() diff --git a/internal/server/source_manager.go b/internal/server/source_manager.go index f80f928d1..601b8354a 100644 --- a/internal/server/source_manager.go +++ b/internal/server/source_manager.go @@ -34,6 +34,7 @@ type sourceManager struct { wg sync.WaitGroup mu sync.RWMutex + uploader *snapshotfs.Uploader pol policy.SchedulingPolicy state string nextSnapshotTime *time.Time @@ -66,8 +67,22 @@ func (s *sourceManager) Status() *serverapi.SourceStatus { func (s *sourceManager) setStatus(stat string) { s.mu.Lock() + defer s.mu.Unlock() s.state = stat - s.mu.Unlock() +} + +func (s *sourceManager) currentUploader() *snapshotfs.Uploader { + s.mu.Lock() + defer s.mu.Unlock() + + return s.uploader +} + +func (s *sourceManager) setUploader(u *snapshotfs.Uploader) { + s.mu.Lock() + defer s.mu.Unlock() + + s.uploader = u } func (s *sourceManager) run(ctx context.Context) { @@ -153,6 +168,12 @@ func (s *sourceManager) resume() serverapi.SourceActionResponse { func (s *sourceManager) stop() { log.Debugf("stopping source manager for %v", s.src) + + if u := s.currentUploader(); u != nil { + log.Infof("canceling current upload") + u.Cancel() + } + close(s.closed) } @@ -165,6 +186,15 @@ func (s *sourceManager) snapshot(ctx context.Context) { s.server.beginUpload(s.src) defer s.server.endUpload(s.src) + // check if we got closed while waiting on semaphore + select { + case <-s.closed: + log.Infof("not snapshotting %v because source manager is shutting down", s.src) + return + + default: + } + localEntry, err := localfs.NewEntry(s.src.Path) if err != nil { log.Errorf("unable to create local filesystem: %v", err) @@ -181,8 +211,10 @@ func (s *sourceManager) snapshot(ctx context.Context) { u.Progress = s.progress log.Infof("starting upload of %v", s.src) - + s.setUploader(u) manifest, err := u.Upload(ctx, localEntry, policyTree, s.src, s.lastCompleteSnapshot, s.lastSnapshot) + s.setUploader(nil) + if err != nil { log.Errorf("upload error: %v", err) return diff --git a/internal/serverapi/serverapi.go b/internal/serverapi/serverapi.go index fac7f6b75..a87a26124 100644 --- a/internal/serverapi/serverapi.go +++ b/internal/serverapi/serverapi.go @@ -6,7 +6,6 @@ "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" - "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" "github.com/kopia/kopia/snapshot/snapshotfs" @@ -55,9 +54,26 @@ type PoliciesResponse struct { type Empty struct { } +// APIErrorCode indicates machine-readable error code returned in API responses. +type APIErrorCode string + +// Supported error codes. +const ( + ErrorInternal APIErrorCode = "INTERNAL" + ErrorMalformedRequest APIErrorCode = "MALFORMED_REQUEST" + ErrorInvalidToken APIErrorCode = "INVALID_TOKEN" + ErrorStorageConnection APIErrorCode = "STORAGE_CONNECTION" + ErrorAlreadyInitialized APIErrorCode = "ALREADY_INITIALIZED" + ErrorNotInitialized APIErrorCode = "NOT_INITIALIZED" + ErrorAlreadyConnected APIErrorCode = "ALREADY_CONNECTED" + ErrorNotConnected APIErrorCode = "NOT_CONNECTED" + ErrorInvalidPassword APIErrorCode = "INVALID_PASSWORD" +) + // ErrorResponse represents error response. type ErrorResponse struct { - Error string `json:"error"` + Code APIErrorCode `json:"code"` + Error string `json:"error"` } // SourceActionResponse is a per-source response. @@ -72,15 +88,23 @@ type MultipleSourceActionResponse struct { // CreateRequest contains request to create a repository in a given storage type CreateRequest struct { - Storage blob.ConnectionInfo `json:"storage"` - Password string `json:"password"` - CacheOptions content.CachingOptions `json:"cacheOptions"` + ConnectRequest NewRepositoryOptions repo.NewRepositoryOptions `json:"options"` } // ConnectRequest contains request to connect to a repository. type ConnectRequest struct { - Storage blob.ConnectionInfo `json:"storage"` - Password string `json:"password"` - CacheOptions content.CachingOptions `json:"cacheOptions"` + Storage blob.ConnectionInfo `json:"storage"` + Password string `json:"password"` + Token string `json:"token"` // when set, overrides Storage and Password +} + +// SupportedAlgorithmsResponse returns the list of supported algorithms for repository creation. +type SupportedAlgorithmsResponse struct { + DefaultHashAlgorithm string `json:"defaultHash"` + DefaultEncryptionAlgorithm string `json:"defaultEncryption"` + DefaultSplitterAlgorithm string `json:"defaultSplitter"` + HashAlgorithms []string `json:"hash"` + EncryptionAlgorithms []string `json:"encryption"` + SplitterAlgorithms []string `json:"splitter"` } diff --git a/tests/end_to_end_test/server_start_test.go b/tests/end_to_end_test/server_start_test.go index e679cff7f..86d007c7d 100644 --- a/tests/end_to_end_test/server_start_test.go +++ b/tests/end_to_end_test/server_start_test.go @@ -130,8 +130,10 @@ func TestServerStartWithoutInitialRepository(t *testing.T) { verifyServerConnected(t, cli, false) if err = cli.CreateRepository(ctx, &serverapi.CreateRequest{ - Password: "foofoo", - Storage: connInfo, + ConnectRequest: serverapi.ConnectRequest{ + Password: "foofoo", + Storage: connInfo, + }, }); err != nil { t.Fatalf("create error: %v", err) }