From 0f79279f5e369552f44e4115f051c5679e2a3e8a Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Tue, 11 Feb 2020 20:43:26 -0800 Subject: [PATCH] server: added support for new verbs in the API /api/v1/repo/create /api/v1/repo/connect /api/v1/repo/disconnect Refactored server code and fixed a number of outstanding robustness issues. Tweaked the API responses a bit to make more sense when consumed by the UI. --- cli/app.go | 18 +- cli/command_server_start.go | 31 +++- cli/config.go | 6 +- internal/server/api_error.go | 4 + internal/server/api_repo.go | 128 ++++++++++++++ internal/server/api_status.go | 21 --- internal/server/server.go | 190 +++++++++++++++++---- internal/server/source_manager.go | 124 +++++++------- internal/serverapi/client.go | 13 ++ internal/serverapi/client_wrappers.go | 37 ++++ internal/serverapi/serverapi.go | 53 ++++-- repo/initialize.go | 8 +- repo/repository.go | 4 +- tests/end_to_end_test/server_start_test.go | 111 ++++++++++-- 14 files changed, 588 insertions(+), 160 deletions(-) create mode 100644 internal/server/api_repo.go delete mode 100644 internal/server/api_status.go create mode 100644 internal/serverapi/client_wrappers.go diff --git a/cli/app.go b/cli/app.go index 693601961..fff8afa1d 100644 --- a/cli/app.go +++ b/cli/app.go @@ -65,6 +65,14 @@ func serverAction(act func(ctx context.Context, cli *serverapi.Client) error) fu } func repositoryAction(act func(ctx context.Context, rep *repo.Repository) error) func(ctx *kingpin.ParseContext) error { + return maybeRepositoryAction(act, true) +} + +func optionalRepositoryAction(act func(ctx context.Context, rep *repo.Repository) error) func(ctx *kingpin.ParseContext) error { + return maybeRepositoryAction(act, false) +} + +func maybeRepositoryAction(act func(ctx context.Context, rep *repo.Repository) error, required bool) func(ctx *kingpin.ParseContext) error { return func(kpc *kingpin.ParseContext) error { return withProfiling(func() error { startMemoryTracking() @@ -80,14 +88,16 @@ func repositoryAction(act func(ctx context.Context, rep *repo.Repository) error) } }) - rep, err := openRepository(ctx, nil) - if err != nil { + rep, err := openRepository(ctx, nil, required) + if err != nil && required { return errors.Wrap(err, "open repository") } err = act(ctx, rep) - if cerr := rep.Close(ctx); cerr != nil { - return errors.Wrap(cerr, "unable to close repository") + if rep != nil { + if cerr := rep.Close(ctx); cerr != nil { + return errors.Wrap(cerr, "unable to close repository") + } } return err }) diff --git a/cli/command_server_start.go b/cli/command_server_start.go index 4c2680d73..b547a96c8 100644 --- a/cli/command_server_start.go +++ b/cli/command_server_start.go @@ -30,16 +30,25 @@ func init() { addUserAndHostFlags(serverStartCommand) - serverStartCommand.Action(repositoryAction(runServer)) + setupConnectOptions(serverStartCommand) + serverStartCommand.Action(optionalRepositoryAction(runServer)) } func runServer(ctx context.Context, rep *repo.Repository) error { - srv, err := server.New(ctx, rep, getHostName(), getUserName()) + srv, err := server.New(ctx, rep, server.Options{ + ConfigFile: repositoryConfigFileName(), + Hostname: getHostName(), + Username: getUserName(), + ConnectOptions: connectOptions(), + RefreshInterval: *serverStartRefreshInterval, + }) if err != nil { return errors.Wrap(err, "unable to initialize server") } - go rep.RefreshPeriodically(ctx, *serverStartRefreshInterval) + if err = srv.SetRepository(ctx, rep); err != nil { + return errors.Wrap(err, "error connecting to repository") + } mux := http.NewServeMux() mux.Handle("/api/", srv.APIHandlers()) @@ -54,6 +63,14 @@ func runServer(ctx context.Context, rep *repo.Repository) error { httpServer := &http.Server{Addr: stripProtocol(*serverAddress)} srv.OnShutdown = httpServer.Shutdown + onCtrlC(func() { + log.Infof("Shutting down...") + + if err = httpServer.Shutdown(ctx); err != nil { + log.Warningf("unable to shut down: %v", err) + } + }) + handler := addInterceptors(mux) if as := *serverStartAutoShutdown; as > 0 { @@ -68,11 +85,13 @@ func runServer(ctx context.Context, rep *repo.Repository) error { httpServer.Handler = handler err = startServerWithOptionalTLS(httpServer) - if err == http.ErrServerClosed { - return nil + if err != http.ErrServerClosed { + return err } - return err + srv.StopAllSourceManagers() + + return nil } func stripProtocol(addr string) string { diff --git a/cli/config.go b/cli/config.go index b7b97de34..c716ee500 100644 --- a/cli/config.go +++ b/cli/config.go @@ -60,7 +60,11 @@ func waitForCtrlC() { <-done } -func openRepository(ctx context.Context, opts *repo.Options) (*repo.Repository, error) { +func openRepository(ctx context.Context, opts *repo.Options, required bool) (*repo.Repository, error) { + if _, err := os.Stat(repositoryConfigFileName()); os.IsNotExist(err) && !required { + return nil, nil + } + pass, err := getPasswordFromFlags(false, true) if err != nil { return nil, errors.Wrap(err, "get password") diff --git a/internal/server/api_error.go b/internal/server/api_error.go index 5889e4776..1c3941eb1 100644 --- a/internal/server/api_error.go +++ b/internal/server/api_error.go @@ -9,6 +9,10 @@ type apiError struct { message string } +func requestError(message string) *apiError { + return &apiError{400, message} +} + func internalServerError(err error) *apiError { return &apiError{500, fmt.Sprintf("internal server error: %v", err)} } diff --git a/internal/server/api_repo.go b/internal/server/api_repo.go new file mode 100644 index 000000000..cbf306942 --- /dev/null +++ b/internal/server/api_repo.go @@ -0,0 +1,128 @@ +package server + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/serverapi" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/blob" +) + +func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) { + if s.rep == nil { + return &serverapi.StatusResponse{ + Connected: false, + }, nil + } + + return &serverapi.StatusResponse{ + Connected: true, + ConfigFile: s.rep.ConfigFile, + CacheDir: s.rep.Content.CachingOptions.CacheDirectory, + Hash: s.rep.Content.Format.Hash, + Encryption: s.rep.Content.Format.Encryption, + MaxPackSize: s.rep.Content.Format.MaxPackSize, + Splitter: s.rep.Objects.Format.Splitter, + Storage: s.rep.Blobs.ConnectionInfo().Type, + }, nil +} + +func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) { + if s.rep != nil { + return nil, requestError("already connected") + } + + var req serverapi.CreateRequest + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, requestError("unable to decode request: " + err.Error()) + } + + st, err := blob.NewStorage(ctx, req.Storage) + if err != nil { + return nil, internalServerError(errors.Wrap(err, "unable to connect to storage")) + } + defer st.Close(ctx) //nolint:errcheck + + if err := repo.Initialize(ctx, st, &req.NewRepositoryOptions, req.Password); err != nil { + return nil, internalServerError(errors.Wrap(err, "unable to initialize repository")) + } + + return s.connectAndOpen(ctx, req.Storage, req.Password) +} + +func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interface{}, *apiError) { + if s.rep != nil { + return nil, requestError("already connected") + } + + var req serverapi.ConnectRequest + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, requestError("unable to decode request: " + err.Error()) + } + + return s.connectAndOpen(ctx, req.Storage, req.Password) +} + +func (s *Server) connectAndOpen(ctx context.Context, conn blob.ConnectionInfo, password string) (interface{}, *apiError) { + st, err := blob.NewStorage(ctx, conn) + if err != nil { + return nil, internalServerError(errors.Wrap(err, "can't open storage")) + } + defer st.Close(ctx) //nolint:errcheck + + if err = repo.Connect(ctx, s.options.ConfigFile, st, password, s.options.ConnectOptions); err != nil { + return nil, internalServerError(errors.Wrap(err, "connect error")) + } + + rep, err := repo.Open(ctx, s.options.ConfigFile, password, nil) + if err != nil { + return nil, internalServerError(errors.Wrap(err, "open error")) + } + + // release shared lock so that SetRepository can acquire exclusive lock + s.mu.RUnlock() + err = s.SetRepository(ctx, rep) + s.mu.RLock() + + if err != nil { + defer rep.Close(ctx) // nolint:errcheck + return nil, internalServerError(err) + } + + return s.handleRepoStatus(ctx, &http.Request{}) +} + +func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (interface{}, *apiError) { + if s.rep == nil { + return nil, requestError("already disconnected") + } + + if err := repo.Disconnect(s.options.ConfigFile); err != nil { + return nil, internalServerError(err) + } + + // release shared lock so that SetRepository can acquire exclusive lock + s.mu.RUnlock() + err := s.SetRepository(ctx, nil) + s.mu.RLock() + + if err != nil { + return nil, internalServerError(err) + } + + return &serverapi.Empty{}, nil +} + +func (s *Server) handleRepoLock(ctx context.Context, r *http.Request) (interface{}, *apiError) { + return nil, &apiError{code: http.StatusNotImplemented} +} + +func (s *Server) handleRepoUnlock(ctx context.Context, r *http.Request) (interface{}, *apiError) { + return nil, &apiError{code: http.StatusNotImplemented} +} diff --git a/internal/server/api_status.go b/internal/server/api_status.go deleted file mode 100644 index 76cf24ece..000000000 --- a/internal/server/api_status.go +++ /dev/null @@ -1,21 +0,0 @@ -package server - -import ( - "context" - "net/http" - - "github.com/kopia/kopia/internal/serverapi" -) - -func (s *Server) handleStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) { - bf := s.rep.Content.Format - bf.HMACSecret = nil - bf.MasterKey = nil - - return &serverapi.StatusResponse{ - ConfigFile: s.rep.ConfigFile, - CacheDir: s.rep.Content.CachingOptions.CacheDirectory, - BlockFormatting: bf, - Storage: s.rep.Blobs.ConnectionInfo().Type, - }, nil -} diff --git a/internal/server/server.go b/internal/server/server.go index 56b1d87ee..ad6a61da8 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -7,6 +7,7 @@ "net/http" "net/url" "sync" + "time" "github.com/pkg/errors" @@ -22,9 +23,12 @@ type Server struct { OnShutdown func(ctx context.Context) error - hostname string - username string - rep *repo.Repository + options Options + rep *repo.Repository + cancelRep context.CancelFunc + + // all API requests run with shared lock on this mutex + // administrative actions run with an exclusive lock and block API calls. mu sync.RWMutex sourceManagers map[snapshot.SourceInfo]*sourceManager uploadSemaphore chan struct{} @@ -34,26 +38,35 @@ type Server struct { func (s *Server) APIHandlers() http.Handler { mux := http.NewServeMux() - mux.HandleFunc("/api/v1/status", s.handleAPI(s.handleStatus, "GET")) mux.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesList, "GET")) mux.HandleFunc("/api/v1/snapshots", s.handleAPI(s.handleSourceSnapshotList, "GET")) mux.HandleFunc("/api/v1/policies", s.handleAPI(s.handlePolicyList, "GET")) + mux.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh, "POST")) mux.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush, "POST")) mux.HandleFunc("/api/v1/shutdown", s.handleAPI(s.handleShutdown, "POST")) + mux.HandleFunc("/api/v1/sources/pause", s.handleAPI(s.handlePause, "POST")) mux.HandleFunc("/api/v1/sources/resume", s.handleAPI(s.handleResume, "POST")) mux.HandleFunc("/api/v1/sources/upload", s.handleAPI(s.handleUpload, "POST")) mux.HandleFunc("/api/v1/sources/cancel", s.handleAPI(s.handleCancel, "POST")) + mux.HandleFunc("/api/v1/objects/", s.handleObjectGet) + mux.HandleFunc("/api/v1/repo/status", s.handleAPI(s.handleRepoStatus, "GET")) + mux.HandleFunc("/api/v1/repo/connect", s.handleAPI(s.handleRepoConnect, "POST")) + mux.HandleFunc("/api/v1/repo/create", s.handleAPI(s.handleRepoCreate, "POST")) + mux.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect, "POST")) + mux.HandleFunc("/api/v1/repo/lock", s.handleAPI(s.handleRepoLock, "POST")) + mux.HandleFunc("/api/v1/repo/unlock", s.handleAPI(s.handleRepoUnlock, "POST")) + return mux } func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError), httpMethod string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() if r.Method != httpMethod { http.Error(w, "incompatible HTTP method", http.StatusMethodNotAllowed) @@ -74,7 +87,13 @@ func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interfa return } - http.Error(w, err.message, err.code) + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(err.code) + + _ = e.Encode(&serverapi.ErrorResponse{ + Error: err.message, + }) } } @@ -91,10 +110,12 @@ func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{}, func (s *Server) handleShutdown(ctx context.Context, r *http.Request) (interface{}, *apiError) { log.Infof("shutting down due to API request") - if s.OnShutdown != nil { - if err := s.OnShutdown(ctx); err != nil { - return nil, internalServerError(err) - } + if f := s.OnShutdown; f != nil { + go func() { + if err := f(ctx); err != nil { + log.Warningf("shutdown failed: %v", err) + } + }() } return &serverapi.Empty{}, nil @@ -144,30 +165,141 @@ func (s *Server) endUpload(src snapshot.SourceInfo) { <-s.uploadSemaphore } +// SetRepository sets the repository (nil is allowed and indicates server that is not +// connected to the repository). +func (s *Server) SetRepository(ctx context.Context, rep *repo.Repository) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.rep == rep { + // nothing to do + return nil + } + + if s.rep != nil { + // close previous source managers + s.stopAllSourceManagersLocked() + + if err := s.rep.Close(ctx); err != nil { + return errors.Wrap(err, "unable to close previous repository") + } + + cr := s.cancelRep + s.cancelRep = nil + + if cr != nil { + cr() + } + } + + s.rep = rep + if s.rep == nil { + return nil + } + + if err := s.syncSourcesLocked(ctx); err != nil { + s.stopAllSourceManagersLocked() + s.rep = nil + + return err + } + + ctx, s.cancelRep = context.WithCancel(ctx) + go s.refreshPeriodically(ctx, rep) + + return nil +} + +func (s *Server) refreshPeriodically(ctx context.Context, r *repo.Repository) { + for { + select { + case <-ctx.Done(): + return + + case <-time.After(s.options.RefreshInterval): + if err := r.Refresh(ctx); err != nil { + log.Warningf("error refreshing repository: %v", err) + } + } + } +} + +// StopAllSourceManagers causes all source managers to stop. +func (s *Server) StopAllSourceManagers() { + s.mu.Lock() + defer s.mu.Unlock() + + s.stopAllSourceManagersLocked() +} + +func (s *Server) stopAllSourceManagersLocked() { + for _, sm := range s.sourceManagers { + sm.stop() + } + + for _, sm := range s.sourceManagers { + sm.waitUntilStopped() + } + + s.sourceManagers = map[snapshot.SourceInfo]*sourceManager{} +} + +func (s *Server) syncSourcesLocked(ctx context.Context) error { + sources, err := snapshot.ListSources(ctx, s.rep) + if err != nil { + return errors.Wrap(err, "unable to list sources") + } + + // copy existing sources to a map, from which we will remove sources that are found + // in the repository + oldSourceManagers := map[snapshot.SourceInfo]*sourceManager{} + for k, v := range s.sourceManagers { + oldSourceManagers[k] = v + } + + for _, src := range sources { + if _, ok := oldSourceManagers[src]; ok { + // pre-existing source, already has a manager + delete(oldSourceManagers, src) + } else { + sm := newSourceManager(src, s) + s.sourceManagers[src] = sm + + go sm.run(ctx) + } + } + + // whatever is left in oldSourceManagers are managers for sources that don't exist anymore. + // stop source manager for sources no longer in the repo. + for _, sm := range oldSourceManagers { + sm.stop() + } + + for src, sm := range oldSourceManagers { + sm.waitUntilStopped() + delete(s.sourceManagers, src) + } + + return nil +} + +// Options encompasses all API server options. +type Options struct { + ConfigFile string + Hostname string + Username string + ConnectOptions *repo.ConnectOptions + RefreshInterval time.Duration +} + // New creates a Server on top of a given Repository. // The server will manage sources for a given username@hostname. -func New(ctx context.Context, rep *repo.Repository, hostname, username string) (*Server, error) { +func New(ctx context.Context, rep *repo.Repository, options Options) (*Server, error) { s := &Server{ - hostname: hostname, - username: username, - rep: rep, + options: options, sourceManagers: map[snapshot.SourceInfo]*sourceManager{}, uploadSemaphore: make(chan struct{}, 1), } - sources, err := snapshot.ListSources(ctx, rep) - if err != nil { - return nil, errors.Wrap(err, "unable to list sources") - } - - for _, src := range sources { - sm := newSourceManager(src, s) - s.sourceManagers[src] = sm - } - - for _, src := range s.sourceManagers { - go src.run(ctx) - } - return s, nil } diff --git a/internal/server/source_manager.go b/internal/server/source_manager.go index b4e866f87..f80f928d1 100644 --- a/internal/server/source_manager.go +++ b/internal/server/source_manager.go @@ -31,18 +31,16 @@ type sourceManager struct { server *Server src snapshot.SourceInfo closed chan struct{} + wg sync.WaitGroup mu sync.RWMutex - pol *policy.Policy + pol policy.SchedulingPolicy state string - nextSnapshotTime time.Time + nextSnapshotTime *time.Time lastCompleteSnapshot *snapshot.Manifest lastSnapshot *snapshot.Manifest - // state of current upload - uploadPath string - uploadPathCompleted int64 - uploadPathTotal int64 + progress *snapshotfs.CountingUploadProgress } func (s *sourceManager) Status() *serverapi.SourceStatus { @@ -52,15 +50,16 @@ func (s *sourceManager) Status() *serverapi.SourceStatus { st := &serverapi.SourceStatus{ Source: s.src, Status: s.state, - LastSnapshotSize: s.lastSnapshot.Stats.TotalFileSize, LastSnapshotTime: s.lastSnapshot.StartTime, NextSnapshotTime: s.nextSnapshotTime, - Policy: s.pol, + SchedulingPolicy: s.pol, } - st.UploadStatus.UploadingPath = s.uploadPath - st.UploadStatus.UploadingPathCompleted = s.uploadPathCompleted - st.UploadStatus.UploadingPathTotal = s.uploadPathTotal + if st.Status == "SNAPSHOTTING" { + c := s.progress.Snapshot() + + st.UploadCounters = &c + } return st } @@ -75,9 +74,14 @@ func (s *sourceManager) run(ctx context.Context) { s.setStatus("INITIALIZING") defer s.setStatus("STOPPED") - if s.server.hostname == s.src.Host { + s.wg.Add(1) + defer s.wg.Done() + + if s.server.options.Hostname == s.src.Host { + log.Debugf("starting local source manager for %v", s.src) s.runLocal(ctx) } else { + log.Debugf("starting remote source manager for %v", s.src) s.runRemote(ctx) } } @@ -86,13 +90,14 @@ func (s *sourceManager) runLocal(ctx context.Context) { s.refreshStatus(ctx) for { - var timeBeforeNextSnapshot time.Duration + var waitTime time.Duration - if !s.nextSnapshotTime.IsZero() { - timeBeforeNextSnapshot = time.Until(s.nextSnapshotTime) - log.Infof("time to next snapshot %v is %v", s.src, timeBeforeNextSnapshot) + if s.nextSnapshotTime != nil { + waitTime = time.Until(*s.nextSnapshotTime) + log.Debugf("time to next snapshot %v is %v", s.src, waitTime) } else { - timeBeforeNextSnapshot = oneDay + log.Debugf("no scheduled snapshot for %v", s.src) + waitTime = oneDay } s.setStatus("WAITING") @@ -103,8 +108,8 @@ func (s *sourceManager) runLocal(ctx context.Context) { case <-time.After(statusRefreshInterval): s.refreshStatus(ctx) - case <-time.After(timeBeforeNextSnapshot): - log.Infof("snapshotting %v", s.src) + case <-time.After(waitTime): + log.Debugf("snapshotting %v", s.src) s.setStatus("SNAPSHOTTING") s.snapshot(ctx) s.refreshStatus(ctx) @@ -126,25 +131,6 @@ func (s *sourceManager) runRemote(ctx context.Context) { } } -func (s *sourceManager) Progress(path string, numFiles int, pathCompleted, pathTotal int64, stats *snapshot.Stats) { - s.mu.Lock() - defer s.mu.Unlock() - - s.uploadPath = path - s.uploadPathCompleted = pathCompleted - s.uploadPathTotal = pathTotal - log.Debugf("path: %v %v/%v", path, pathCompleted, pathTotal) -} - -func (s *sourceManager) UploadFinished() { - s.mu.Lock() - defer s.mu.Unlock() - - s.uploadPath = "" - s.uploadPathCompleted = 0 - s.uploadPathTotal = 0 -} - func (s *sourceManager) upload() serverapi.SourceActionResponse { log.Infof("upload triggered via API: %v", s.src) return serverapi.SourceActionResponse{Success: true} @@ -165,6 +151,16 @@ func (s *sourceManager) resume() serverapi.SourceActionResponse { return serverapi.SourceActionResponse{Success: true} } +func (s *sourceManager) stop() { + log.Debugf("stopping source manager for %v", s.src) + close(s.closed) +} + +func (s *sourceManager) waitUntilStopped() { + s.wg.Wait() + log.Debugf("source manager for %v has stopped", s.src) +} + func (s *sourceManager) snapshot(ctx context.Context) { s.server.beginUpload(s.src) defer s.server.endUpload(s.src) @@ -182,7 +178,7 @@ func (s *sourceManager) snapshot(ctx context.Context) { log.Errorf("unable to create policy getter: %v", err) } - u.Progress = s + u.Progress = s.progress log.Infof("starting upload of %v", s.src) @@ -211,31 +207,26 @@ func (s *sourceManager) snapshot(ctx context.Context) { } } -func (s *sourceManager) findClosestNextSnapshotTime() time.Time { - nextSnapshotTime := time.Now().Add(oneDay) +func (s *sourceManager) findClosestNextSnapshotTime() *time.Time { + var nextSnapshotTime *time.Time - if s.pol != nil { - // compute next snapshot time based on interval - if interval := s.pol.SchedulingPolicy.IntervalSeconds; interval != 0 { - interval := time.Duration(interval) * time.Second - nt := s.lastSnapshot.StartTime.Add(interval).Truncate(interval) + // compute next snapshot time based on interval + if interval := s.pol.IntervalSeconds; interval != 0 { + interval := time.Duration(interval) * time.Second + nt := s.lastSnapshot.StartTime.Add(interval).Truncate(interval) + nextSnapshotTime = &nt + } - if nt.Before(nextSnapshotTime) { - nextSnapshotTime = nt - } + for _, tod := range s.pol.TimesOfDay { + nowLocalTime := time.Now().Local() + localSnapshotTime := time.Date(nowLocalTime.Year(), nowLocalTime.Month(), nowLocalTime.Day(), tod.Hour, tod.Minute, 0, 0, time.Local) + + if tod.Hour < nowLocalTime.Hour() || (tod.Hour == nowLocalTime.Hour() && tod.Minute < nowLocalTime.Minute()) { + localSnapshotTime = localSnapshotTime.Add(oneDay) } - for _, tod := range s.pol.SchedulingPolicy.TimesOfDay { - nowLocalTime := time.Now().Local() - localSnapshotTime := time.Date(nowLocalTime.Year(), nowLocalTime.Month(), nowLocalTime.Day(), tod.Hour, tod.Minute, 0, 0, time.Local) - - if tod.Hour < nowLocalTime.Hour() || (tod.Hour == nowLocalTime.Hour() && tod.Minute < nowLocalTime.Minute()) { - localSnapshotTime = localSnapshotTime.Add(oneDay) - } - - if localSnapshotTime.Before(nextSnapshotTime) { - nextSnapshotTime = localSnapshotTime - } + if nextSnapshotTime == nil || localSnapshotTime.Before(*nextSnapshotTime) { + nextSnapshotTime = &localSnapshotTime } } @@ -251,7 +242,7 @@ func (s *sourceManager) refreshStatus(ctx context.Context) { return } - s.pol = pol + s.pol = pol.SchedulingPolicy snapshots, err := snapshot.ListSnapshots(ctx, s.server.rep, s.src) if err != nil { @@ -266,17 +257,18 @@ func (s *sourceManager) refreshStatus(ctx context.Context) { s.lastSnapshot = snaps[0] s.nextSnapshotTime = s.findClosestNextSnapshotTime() } else { - s.nextSnapshotTime = time.Time{} + s.nextSnapshotTime = nil s.lastSnapshot = nil } } func newSourceManager(src snapshot.SourceInfo, server *Server) *sourceManager { m := &sourceManager{ - src: src, - server: server, - state: "UNKNOWN", - closed: make(chan struct{}), + src: src, + server: server, + state: "UNKNOWN", + closed: make(chan struct{}), + progress: &snapshotfs.CountingUploadProgress{}, } return m diff --git a/internal/serverapi/client.go b/internal/serverapi/client.go index 7fef48d37..c68931746 100644 --- a/internal/serverapi/client.go +++ b/internal/serverapi/client.go @@ -9,9 +9,12 @@ "encoding/json" "net/http" + "github.com/kopia/kopia/internal/kopialogging" "github.com/pkg/errors" ) +var log = kopialogging.Logger("kopia/client") + // DefaultUsername is the default username for Kopia server. const DefaultUsername = "kopia" @@ -27,6 +30,10 @@ func (c *Client) Get(path string, respPayload interface{}) error { return err } + if c.options.LogRequests { + log.Debugf("GET %v", c.options.BaseURL+path) + } + if c.options.Username != "" { req.SetBasicAuth(c.options.Username, c.options.Password) } @@ -56,6 +63,10 @@ func (c *Client) Post(path string, reqPayload, respPayload interface{}) error { return errors.Wrap(err, "unable to encode request") } + if c.options.LogRequests { + log.Infof("POST %v (%v bytes)", c.options.BaseURL+path, buf.Len()) + } + req, err := http.NewRequest("POST", c.options.BaseURL+path, &buf) if err != nil { return err @@ -97,6 +108,8 @@ type ClientOptions struct { TrustedServerCertificateFingerprint string RootCAs *x509.CertPool + + LogRequests bool } // NewClient creates a options.HTTPClient for connecting to Kopia HTTP API. diff --git a/internal/serverapi/client_wrappers.go b/internal/serverapi/client_wrappers.go new file mode 100644 index 000000000..2dca8cf41 --- /dev/null +++ b/internal/serverapi/client_wrappers.go @@ -0,0 +1,37 @@ +package serverapi + +import "context" + +func (c *Client) CreateRepository(ctx context.Context, req *CreateRequest) error { + return c.Post("repo/create", req, &StatusResponse{}) +} + +func (c *Client) ConnectToRepository(ctx context.Context, req *ConnectRequest) error { + return c.Post("repo/connect", req, &StatusResponse{}) +} + +func (c *Client) DisconnectFromRepository(ctx context.Context) error { + return c.Post("repo/disconnect", &Empty{}, &Empty{}) +} + +func (c *Client) Shutdown(ctx context.Context) { + _ = c.Post("shutdown", &Empty{}, &Empty{}) +} + +func (c *Client) Status(ctx context.Context) (*StatusResponse, error) { + resp := &StatusResponse{} + if err := c.Get("repo/status", resp); err != nil { + return nil, err + } + + return resp, nil +} + +func (c *Client) Sources(ctx context.Context) (*SourcesResponse, error) { + resp := &SourcesResponse{} + if err := c.Get("sources", resp); err != nil { + return nil, err + } + + return resp, nil +} diff --git a/internal/serverapi/serverapi.go b/internal/serverapi/serverapi.go index fe728a7dd..fac7f6b75 100644 --- a/internal/serverapi/serverapi.go +++ b/internal/serverapi/serverapi.go @@ -4,17 +4,24 @@ import ( "time" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" + "github.com/kopia/kopia/snapshot/snapshotfs" ) // StatusResponse is the response of 'status' HTTP API command. type StatusResponse struct { - ConfigFile string `json:"configFile"` - CacheDir string `json:"cacheDir"` - BlockFormatting content.FormattingOptions `json:"blockFormatting"` - Storage string `json:"storage"` + Connected bool `json:"connected"` + ConfigFile string `json:"configFile,omitempty"` + CacheDir string `json:"cacheDir,omitempty"` + Hash string `json:"hash,omitempty"` + Encryption string `json:"encryption,omitempty"` + Splitter string `json:"splitter,omitempty"` + MaxPackSize int `json:"maxPackSize,omitempty"` + Storage string `json:"storage,omitempty"` } // SourcesResponse is the response of 'sources' HTTP API command. @@ -24,18 +31,12 @@ type SourcesResponse struct { // SourceStatus describes the status of a single source. type SourceStatus struct { - Source snapshot.SourceInfo `json:"source"` - Status string `json:"status"` - Policy *policy.Policy `json:"policy"` - LastSnapshotSize int64 `json:"lastSnapshotSize,omitempty"` - LastSnapshotTime time.Time `json:"lastSnapshotTime,omitempty"` - NextSnapshotTime time.Time `json:"nextSnapshotTime,omitempty"` - - UploadStatus struct { - UploadingPath string `json:"path,omitempty"` - UploadingPathCompleted int64 `json:"pathCompleted,omitempty"` - UploadingPathTotal int64 `json:"pathTotal,omitempty"` - } `json:"upload"` + Source snapshot.SourceInfo `json:"source"` + Status string `json:"status"` + SchedulingPolicy policy.SchedulingPolicy `json:"schedule"` + LastSnapshotTime time.Time `json:"lastSnapshotTime,omitempty"` + NextSnapshotTime *time.Time `json:"nextSnapshotTime,omitempty"` + UploadCounters *snapshotfs.UploadCounters `json:"upload,omitempty"` } // PolicyListEntry describes single policy. @@ -54,6 +55,11 @@ type PoliciesResponse struct { type Empty struct { } +// ErrorResponse represents error response. +type ErrorResponse struct { + Error string `json:"error"` +} + // SourceActionResponse is a per-source response. type SourceActionResponse struct { Success bool `json:"success"` @@ -63,3 +69,18 @@ type SourceActionResponse struct { type MultipleSourceActionResponse struct { Sources map[string]SourceActionResponse `json:"sources"` } + +// CreateRequest contains request to create a repository in a given storage +type CreateRequest struct { + Storage blob.ConnectionInfo `json:"storage"` + Password string `json:"password"` + CacheOptions content.CachingOptions `json:"cacheOptions"` + NewRepositoryOptions repo.NewRepositoryOptions `json:"options"` +} + +// ConnectRequest contains request to connect to a repository. +type ConnectRequest struct { + Storage blob.ConnectionInfo `json:"storage"` + Password string `json:"password"` + CacheOptions content.CachingOptions `json:"cacheOptions"` +} diff --git a/repo/initialize.go b/repo/initialize.go index fc5b37e36..604b69e3d 100644 --- a/repo/initialize.go +++ b/repo/initialize.go @@ -27,10 +27,10 @@ // NewRepositoryOptions specifies options that apply to newly created repositories. // All fields are optional, when not provided, reasonable defaults will be used. type NewRepositoryOptions struct { - UniqueID []byte // force the use of particular unique ID - BlockFormat content.FormattingOptions - DisableHMAC bool - ObjectFormat object.Format // object format + UniqueID []byte `json:"uniqueID"` // force the use of particular unique ID + BlockFormat content.FormattingOptions `json:"blockFormat"` + DisableHMAC bool `json:"disableHMAC"` + ObjectFormat object.Format `json:"objectFormat"` // object format } // Initialize creates initial repository data structures in the specified storage with given credentials. diff --git a/repo/repository.go b/repo/repository.go index fbbdcb21b..603c027c8 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -28,8 +28,8 @@ type Repository struct { // Close closes the repository and releases all resources. func (r *Repository) Close(ctx context.Context) error { - if err := r.Manifests.Flush(ctx); err != nil { - return errors.Wrap(err, "error flushing manifests") + if err := r.Flush(ctx); err != nil { + return errors.Wrap(err, "error flushing") } if err := r.Content.Close(ctx); err != nil { diff --git a/tests/end_to_end_test/server_start_test.go b/tests/end_to_end_test/server_start_test.go index 4a64c8f1a..897e60d47 100644 --- a/tests/end_to_end_test/server_start_test.go +++ b/tests/end_to_end_test/server_start_test.go @@ -1,11 +1,14 @@ package endtoend_test import ( + "context" "strings" "testing" "time" "github.com/kopia/kopia/internal/serverapi" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/filesystem" "github.com/kopia/kopia/tests/testenv" ) @@ -42,20 +45,74 @@ func (s *serverParameters) ProcessOutput(l string) bool { func TestServerStart(t *testing.T) { t.Parallel() + ctx := context.Background() + e := testenv.NewCLITest(t) defer e.Cleanup(t) defer e.RunAndExpectSuccess(t, "repo", "disconnect") e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir) - // Start the server and wait for it to auto-shutdown in 3 seconds. - // If this does not work, we avoid starting a longer test. - e.RunAndExpectSuccess(t, "server", "start", "--ui", "--auto-shutdown=3s") - time.Sleep(3 * time.Second) + e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1) + e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1) var sp serverParameters - e.RunAndProcessStderr(t, sp.ProcessOutput, "server", "start", "--ui", "--random-password", "--tls-generate-cert", "--auto-shutdown=60s") + e.RunAndProcessStderr(t, sp.ProcessOutput, "server", "start", "--ui", "--address=localhost:0", "--random-password", "--tls-generate-cert", "--auto-shutdown=10s") + t.Logf("detected server parameters %#v", sp) + + cli, err := serverapi.NewClient(serverapi.ClientOptions{ + BaseURL: sp.baseURL, + Password: sp.password, + TrustedServerCertificateFingerprint: sp.sha256Fingerprint, + LogRequests: true, + }) + if err != nil { + t.Fatalf("unable to create API client") + } + + defer cli.Shutdown(ctx) // nolint:errcheck + + time.Sleep(1 * time.Second) + + st := verifyServerConnected(t, cli, true) + if got, want := st.Storage, "filesystem"; got != want { + t.Errorf("unexpected storage type: %v, want %v", got, want) + } + + sources, err := cli.Sources(ctx) + if err != nil { + t.Fatalf("error listing sources: %v", err) + } + + if got, want := len(sources.Sources), 1; got != want { + t.Errorf("unexpected number of sources %v, want %v", got, want) + } + + if got, want := sources.Sources[0].Source.Path, sharedTestDataDir1; got != want { + t.Errorf("unexpected source path: %v, want %v", got, want) + } +} + +func TestServerStartWithoutInitialRepository(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + e := testenv.NewCLITest(t) + defer e.Cleanup(t) + defer e.RunAndExpectSuccess(t, "repo", "disconnect") + + var sp serverParameters + + connInfo := blob.ConnectionInfo{ + Type: "filesystem", + Config: filesystem.Options{ + Path: e.RepoDir, + }, + } + + e.RunAndProcessStderr(t, sp.ProcessOutput, "server", "start", "--ui", "--address=localhost:0", "--random-password", "--tls-generate-cert", "--auto-shutdown=60s") t.Logf("detected server parameters %#v", sp) cli, err := serverapi.NewClient(serverapi.ClientOptions{ @@ -67,16 +124,48 @@ func TestServerStart(t *testing.T) { t.Fatalf("unable to create API client") } + defer cli.Shutdown(ctx) // nolint:errcheck + time.Sleep(1 * time.Second) - if err := cli.Get("status", &serverapi.Empty{}); err != nil { - t.Errorf("status error: %v", err) + verifyServerConnected(t, cli, false) + + if err = cli.CreateRepository(ctx, &serverapi.CreateRequest{ + Password: "foofoo", + Storage: connInfo, + }); err != nil { + t.Fatalf("create error: %v", err) } - // TODO - add more tests + verifyServerConnected(t, cli, true) - // explicit shutdown - if err := cli.Post("shutdown", &serverapi.Empty{}, &serverapi.Empty{}); err != nil { - t.Logf("expected shutdown error: %v", err) + if err = cli.DisconnectFromRepository(ctx); err != nil { + t.Fatalf("disconnect error: %v", err) } + + verifyServerConnected(t, cli, false) + + if err = cli.ConnectToRepository(ctx, &serverapi.ConnectRequest{ + Password: "foofoo", + Storage: connInfo, + }); err != nil { + t.Fatalf("create error: %v", err) + } + + verifyServerConnected(t, cli, true) +} + +func verifyServerConnected(t *testing.T, cli *serverapi.Client, want bool) *serverapi.StatusResponse { + t.Helper() + + st, err := cli.Status(context.Background()) + if err != nil { + t.Fatalf("status error: %v", err) + } + + if got := st.Connected; got != want { + t.Errorf("invalid status connected %v, want %v", st.Connected, want) + } + + return st }