Support for remote repository (#427)

Support for remote content repository where all contents and
manifests are fetched over HTTP(S) instead of locally
manipulating blob storage

* server: implement content and manifest access APIs
* apiclient: moved Kopia API client to separate package
* content: exposed content.ValidatePrefix()
* manifest: added JSON serialization attributes to EntryMetadata
* repo: changed repo.Open() to return Repository instead of *DirectRepository
* repo: added apiServerRepository
* cli: added 'kopia repository connect server'
  This sets up repository connection via the API server instead of
  directly-manipulated storage.
* server: add support for specifying a list of usernames/password via --htpasswd-file
* tests: added API server repository E2E test
* server: only return manifests (policies and snapshots) belonging to authenticated user
This commit is contained in:
Jarek Kowalski
2020-05-02 21:41:49 -07:00
committed by GitHub
parent 1377d057e4
commit be4b897579
38 changed files with 1048 additions and 327 deletions

View File

@@ -10,7 +10,7 @@
"github.com/pkg/errors"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
@@ -61,14 +61,14 @@ func noRepositoryAction(act func(ctx context.Context) error) func(ctx *kingpin.P
}
}
func serverAction(act func(ctx context.Context, cli *serverapi.Client) error) func(ctx *kingpin.ParseContext) error {
func serverAction(act func(ctx context.Context, cli *apiclient.KopiaAPIClient) error) func(ctx *kingpin.ParseContext) error {
return func(_ *kingpin.ParseContext) error {
opts, err := serverAPIClientOptions()
if err != nil {
return errors.Wrap(err, "unable to create API client options")
}
apiClient, err := serverapi.NewClient(opts)
apiClient, err := apiclient.NewKopiaAPIClient(opts)
if err != nil {
return errors.Wrap(err, "unable to create API client")
}

View File

@@ -43,7 +43,7 @@ func connectToStorageFromConfigFile(ctx context.Context) (blob.Storage, error) {
return nil, errors.Wrap(err, "unable to load config")
}
return blob.NewStorage(ctx, cfg.Storage)
return blob.NewStorage(ctx, *cfg.Storage)
}
func connectToStorageFromConfigToken(ctx context.Context) (blob.Storage, error) {

View File

@@ -0,0 +1,42 @@
package cli
import (
"context"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
)
var (
connectAPIServerCommand = connectCommand.Command("server", "Connect to a repository API Server.")
connectAPIServerURL = connectAPIServerCommand.Flag("url", "Server URL").Required().String()
connectAPIServerCertFingerprint = connectAPIServerCommand.Flag("server-cert-fingerprint", "Server certificate fingerprint").String()
)
func runConnectAPIServerCommand(ctx context.Context) error {
password, err := getPasswordFromFlags(ctx, false, false)
if err != nil {
return errors.Wrap(err, "getting password")
}
as := &repo.APIServerInfo{
BaseURL: *connectAPIServerURL,
TrustedServerCertificateFingerprint: *connectAPIServerCertFingerprint,
}
configFile := repositoryConfigFileName()
if err := repo.ConnectAPIServer(ctx, configFile, as, password, connectOptions()); err != nil {
return err
}
printStderr("Connected to repository API Server.\n")
maybeInitializeUpdateCheck(ctx)
return nil
}
func init() {
connectAPIServerCommand.Action(noRepositoryAction(runConnectAPIServerCommand))
}

View File

@@ -105,11 +105,13 @@ func populateRepository(ctx context.Context, password string) error {
printStdout("To change the policy use:\n kopia policy set --global <options>\n")
printStdout("or\n kopia policy set <dir> <options>\n")
p := maintenance.DefaultParams()
p.Owner = rep.Username() + "@" + rep.Hostname()
if dr, ok := rep.(*repo.DirectRepository); ok {
p := maintenance.DefaultParams()
p.Owner = rep.Username() + "@" + rep.Hostname()
if err := maintenance.SetParams(ctx, rep, &p); err != nil {
return errors.Wrap(err, "unable to set maintenance params")
if err := maintenance.SetParams(ctx, dr, &p); err != nil {
return errors.Wrap(err, "unable to set maintenance params")
}
}
return nil

View File

@@ -1,9 +1,9 @@
package cli
import (
"github.com/kopia/kopia/internal/serverapi"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/apiclient"
)
var (
@@ -12,12 +12,12 @@
serverPassword = serverCommands.Flag("server-password", "HTTP server password (basic auth)").Envar("KOPIA_SERVER_PASSWORD").String()
)
func serverAPIClientOptions() (serverapi.ClientOptions, error) {
func serverAPIClientOptions() (apiclient.Options, error) {
if *serverAddress == "" {
return serverapi.ClientOptions{}, errors.Errorf("missing server address")
return apiclient.Options{}, errors.Errorf("missing server address")
}
return serverapi.ClientOptions{
return apiclient.Options{
BaseURL: *serverAddress,
Username: *serverUsername,
Password: *serverPassword,

View File

@@ -3,6 +3,7 @@
import (
"context"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/serverapi"
)
@@ -14,6 +15,6 @@ func init() {
serverCancelUploadCommand.Action(serverAction(runServerCancelUpload))
}
func runServerCancelUpload(ctx context.Context, cli *serverapi.Client) error {
func runServerCancelUpload(ctx context.Context, cli *apiclient.KopiaAPIClient) error {
return cli.Post(ctx, "sources/cancel", &serverapi.Empty{}, &serverapi.Empty{})
}

View File

@@ -3,6 +3,7 @@
import (
"context"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/serverapi"
)
@@ -14,6 +15,6 @@ func init() {
serverFlushCommand.Action(serverAction(runServerFlush))
}
func runServerFlush(ctx context.Context, cli *serverapi.Client) error {
func runServerFlush(ctx context.Context, cli *apiclient.KopiaAPIClient) error {
return cli.Post(ctx, "flush", &serverapi.Empty{}, &serverapi.Empty{})
}

View File

@@ -3,6 +3,7 @@
import (
"context"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/serverapi"
)
@@ -14,6 +15,6 @@ func init() {
serverPauseCommand.Action(serverAction(runServerPause))
}
func runServerPause(ctx context.Context, cli *serverapi.Client) error {
func runServerPause(ctx context.Context, cli *apiclient.KopiaAPIClient) error {
return cli.Post(ctx, "sources/pause", &serverapi.Empty{}, &serverapi.Empty{})
}

View File

@@ -3,6 +3,7 @@
import (
"context"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/serverapi"
)
@@ -14,6 +15,6 @@ func init() {
serverRefreshCommand.Action(serverAction(runServerRefresh))
}
func runServerRefresh(ctx context.Context, cli *serverapi.Client) error {
func runServerRefresh(ctx context.Context, cli *apiclient.KopiaAPIClient) error {
return cli.Post(ctx, "refresh", &serverapi.Empty{}, &serverapi.Empty{})
}

View File

@@ -3,6 +3,7 @@
import (
"context"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/serverapi"
)
@@ -14,6 +15,6 @@ func init() {
serverResumeCommand.Action(serverAction(runServerResume))
}
func runServerResume(ctx context.Context, cli *serverapi.Client) error {
func runServerResume(ctx context.Context, cli *apiclient.KopiaAPIClient) error {
return cli.Post(ctx, "sources/resume", &serverapi.Empty{}, &serverapi.Empty{})
}

View File

@@ -16,6 +16,8 @@
"strings"
"time"
htpasswd "github.com/tg123/go-htpasswd"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/pkg/errors"
prom "github.com/prometheus/client_golang/prometheus"
@@ -32,6 +34,7 @@
serverStartRandomPassword = serverStartCommand.Flag("random-password", "Generate random password and print to stderr").Hidden().Bool()
serverStartAutoShutdown = serverStartCommand.Flag("auto-shutdown", "Auto shutdown the server if API requests not received within given time").Hidden().Duration()
serverStartHtpasswdFile = serverStartCommand.Flag("htpasswd-file", "Path to htpasswd file that contains allowed user@hostname entries").Hidden().ExistingFile()
)
func init() {
@@ -75,7 +78,10 @@ func runServer(ctx context.Context, rep repo.Repository) error {
}
})
mux = requireCredentials(mux)
mux, err = requireCredentials(mux)
if err != nil {
return errors.Wrap(err, "unable to setup credentials")
}
// init prometheus after adding interceptors that require credentials, so that this
// handler can be called without auth
@@ -187,12 +193,24 @@ func serveIndexFileForKnownUIRoutes(fs http.FileSystem) http.Handler {
})
}
func requireCredentials(handler http.Handler) *http.ServeMux {
if *serverPassword != "" {
handler = requireAuth{handler, *serverUsername, *serverPassword}
}
func requireCredentials(handler http.Handler) (*http.ServeMux, error) {
switch {
case *serverStartHtpasswdFile != "":
f, err := htpasswd.New(*serverStartHtpasswdFile, htpasswd.DefaultSystems, nil)
if err != nil {
return nil, err
}
if *serverStartRandomPassword {
handler = requireAuth{inner: handler, htpasswdFile: f}
case *serverPassword != "":
handler = requireAuth{
inner: handler,
expectedUsername: *serverUsername,
expectedPassword: *serverPassword,
}
case *serverStartRandomPassword:
// generate very long random one-time password
b := make([]byte, 32)
io.ReadFull(rand.Reader, b) //nolint:errcheck
@@ -202,19 +220,24 @@ func requireCredentials(handler http.Handler) *http.ServeMux {
// print it to the stderr bypassing any log file so that the user or calling process can connect
fmt.Fprintln(os.Stderr, "SERVER PASSWORD:", randomPassword)
handler = requireAuth{handler, *serverUsername, randomPassword}
handler = requireAuth{
inner: handler,
expectedUsername: *serverUsername,
expectedPassword: randomPassword,
}
}
mux := http.NewServeMux()
mux.Handle("/", handler)
return mux
return mux, nil
}
type requireAuth struct {
inner http.Handler
expectedUsername string
expectedPassword string
htpasswdFile *htpasswd.File
}
func (a requireAuth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -226,8 +249,16 @@ func (a requireAuth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
valid := subtle.ConstantTimeCompare([]byte(user), []byte(a.expectedUsername)) *
subtle.ConstantTimeCompare([]byte(pass), []byte(a.expectedPassword))
var valid int
if a.htpasswdFile != nil {
if a.htpasswdFile.Match(user, pass) {
valid = 1
}
} else {
valid = subtle.ConstantTimeCompare([]byte(user), []byte(a.expectedUsername)) *
subtle.ConstantTimeCompare([]byte(pass), []byte(a.expectedPassword))
}
if valid != 1 {
w.Header().Set("WWW-Authenticate", `Basic realm="Kopia"`)

View File

@@ -4,6 +4,7 @@
"context"
"fmt"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/serverapi"
)
@@ -15,9 +16,9 @@ func init() {
serverStatusCommand.Action(serverAction(runServerStatus))
}
func runServerStatus(ctx context.Context, cli *serverapi.Client) error {
func runServerStatus(ctx context.Context, cli *apiclient.KopiaAPIClient) error {
var status serverapi.SourcesResponse
if err := cli.Get(ctx, "sources", &status); err != nil {
if err := cli.Get(ctx, "sources", nil, &status); err != nil {
return err
}

View File

@@ -4,6 +4,7 @@
"context"
"fmt"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/serverapi"
)
@@ -15,11 +16,11 @@ func init() {
serverStartUploadCommand.Action(serverAction(runServerStartUpload))
}
func runServerStartUpload(ctx context.Context, cli *serverapi.Client) error {
func runServerStartUpload(ctx context.Context, cli *apiclient.KopiaAPIClient) error {
return triggerActionOnMatchingSources(ctx, cli, "sources/upload")
}
func triggerActionOnMatchingSources(ctx context.Context, cli *serverapi.Client, path string) error {
func triggerActionOnMatchingSources(ctx context.Context, cli *apiclient.KopiaAPIClient, path string) error {
var resp serverapi.MultipleSourceActionResponse
if err := cli.Post(ctx, path, &serverapi.Empty{}, &resp); err != nil {

View File

@@ -9,7 +9,6 @@
"os"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
)
func main() {
@@ -28,15 +27,4 @@ func main() {
defer r.Close(ctx) //nolint:errcheck
uploadAndDownloadObjects(ctx, r)
// Now list contents found in the repository.
if err := r.Content.IterateContents(
ctx,
content.IterateOptions{},
func(ci content.Info) error {
log.Printf("found content %v", ci)
return nil
}); err != nil {
log.Printf("err: %v", err)
}
}

1
go.mod
View File

@@ -40,6 +40,7 @@ require (
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
github.com/stretchr/testify v1.4.0
github.com/studio-b12/gowebdav v0.0.0-20200303150724-9380631c29a1
github.com/tg123/go-htpasswd v1.0.0
github.com/zalando/go-keyring v0.0.0-20200121091418-667557018717
go.opencensus.io v0.22.3
gocloud.dev v0.19.0

3
go.sum
View File

@@ -485,6 +485,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/studio-b12/gowebdav v0.0.0-20200303150724-9380631c29a1 h1:TPyHV/OgChqNcnYqCoCvIFjR9TU60gFXXBKnhOBzVEI=
github.com/studio-b12/gowebdav v0.0.0-20200303150724-9380631c29a1/go.mod h1:gCcfDlA1Y7GqOaeEKw5l9dOGx1VLdc/HuQSlQAaZ30s=
github.com/tg123/go-htpasswd v1.0.0 h1:Ze/pZsz73JiCwXIyJBPvNs75asKBgfodCf8iTEkgkXs=
github.com/tg123/go-htpasswd v1.0.0/go.mod h1:eQTgl67UrNKQvEPKrDLGBssjVwYQClFZjALVLhIv8C0=
github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
@@ -518,6 +520,7 @@ gocloud.dev v0.19.0/go.mod h1:SmKwiR8YwIMMJvQBKLsC3fHNyMwXLw3PMDO+VVteJMI=
golang.org/x/arch v0.0.0-20190909030613-46d78d1859ac/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181106171534-e4dc69e5b2fd/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

View File

@@ -0,0 +1,211 @@
// Package apiclient implements a client for connecting to Kopia HTTP API server.
package apiclient
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/logging"
)
var log = logging.GetContextLoggerFunc("client")
// KopiaAPIClient provides helper methods for communicating with Kopia API server.
type KopiaAPIClient struct {
BaseURL string
HTTPClient *http.Client
}
// Get is a helper that performs HTTP GET on a URL with the specified suffix and decodes the response
// onto respPayload which must be a pointer to byte slice or JSON-serializable structure.
func (c *KopiaAPIClient) Get(ctx context.Context, urlSuffix string, onNotFound error, respPayload interface{}) error {
return c.runRequest(ctx, http.MethodGet, c.BaseURL+urlSuffix, onNotFound, nil, respPayload)
}
// Post is a helper that performs HTTP POST on a URL with the specified body from reqPayload and decodes the response
// onto respPayload which must be a pointer to byte slice or JSON-serializable structure.
func (c *KopiaAPIClient) Post(ctx context.Context, urlSuffix string, reqPayload, respPayload interface{}) error {
return c.runRequest(ctx, http.MethodPost, c.BaseURL+urlSuffix, nil, reqPayload, respPayload)
}
// Put is a helper that performs HTTP PUT on a URL with the specified body from reqPayload and decodes the response
// onto respPayload which must be a pointer to byte slice or JSON-serializable structure.
func (c *KopiaAPIClient) Put(ctx context.Context, urlSuffix string, reqPayload, respPayload interface{}) error {
return c.runRequest(ctx, http.MethodPut, c.BaseURL+urlSuffix, nil, reqPayload, respPayload)
}
// Delete is a helper that performs HTTP DELETE on a URL with the specified body from reqPayload and decodes the response
// onto respPayload which must be a pointer to byte slice or JSON-serializable structure.
func (c *KopiaAPIClient) Delete(ctx context.Context, urlSuffix string, reqPayload, respPayload interface{}) error {
return c.runRequest(ctx, http.MethodDelete, c.BaseURL+urlSuffix, nil, reqPayload, respPayload)
}
func (c *KopiaAPIClient) runRequest(ctx context.Context, method, url string, notFoundError error, reqPayload, respPayload interface{}) error {
payload, contentType, err := requestReader(reqPayload)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, method, url, payload)
if err != nil {
return err
}
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode == http.StatusNotFound && notFoundError != nil {
return notFoundError
}
return decodeResponse(resp, respPayload)
}
func requestReader(reqPayload interface{}) (io.Reader, string, error) {
if reqPayload == nil {
return nil, "", nil
}
if bs, ok := reqPayload.([]byte); ok {
return bytes.NewReader(bs), "application/octet-stream", nil
}
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(reqPayload); err != nil {
return nil, "", errors.Wrap(err, "unable to serialize JSON")
}
return bytes.NewReader(b.Bytes()), "application/json", nil
}
func decodeResponse(resp *http.Response, respPayload interface{}) error {
if resp.StatusCode != http.StatusOK {
return errors.Errorf("server error: %v", resp.Status)
}
if respPayload == nil {
return nil
}
if b, ok := respPayload.(*[]byte); ok {
v, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "unable to read response")
}
*b = v
} else if err := json.NewDecoder(resp.Body).Decode(respPayload); err != nil {
return errors.Wrap(err, "unable to parse JSON response")
}
return nil
}
// Options encapsulates all optional parameters for KopiaAPIClient.
type Options struct {
BaseURL string
Username string
Password string
TrustedServerCertificateFingerprint string
LogRequests bool
}
// NewKopiaAPIClient creates a client for connecting to Kopia HTTP API.
// nolint:gocritic
func NewKopiaAPIClient(options Options) (*KopiaAPIClient, error) {
var transport http.RoundTripper
// override transport which trusts only one certificate
if f := options.TrustedServerCertificateFingerprint; f != "" {
t2 := http.DefaultTransport.(*http.Transport).Clone()
t2.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true, //nolint:gosec
VerifyPeerCertificate: verifyPeerCertificate(f),
}
transport = t2
} else {
transport = http.DefaultTransport
}
// wrap with a round-tripper that provides basic authentication
if options.Username != "" || options.Password != "" {
transport = basicAuthTransport{transport, options.Username, options.Password}
}
if options.LogRequests {
transport = loggingTransport{transport}
}
return &KopiaAPIClient{
options.BaseURL + "/api/v1/",
&http.Client{
Transport: transport,
},
}, nil
}
func verifyPeerCertificate(sha256Fingerprint string) func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
return func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
for _, c := range rawCerts {
h := sha256.Sum256(c)
if hex.EncodeToString(h[:]) == sha256Fingerprint {
return nil
}
}
return errors.Errorf("can't find certificate matching SHA256 fingerprint %q", sha256Fingerprint)
}
}
type basicAuthTransport struct {
base http.RoundTripper
username string
password string
}
func (t basicAuthTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req.SetBasicAuth(t.username, t.password)
return t.base.RoundTrip(req)
}
type loggingTransport struct {
base http.RoundTripper
}
func (t loggingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
t0 := time.Now()
resp, err := t.base.RoundTrip(req)
if err != nil {
log(req.Context()).Debugf("%v %v took %v and failed with %v", req.Method, req.URL, time.Since(t0), err)
return nil, err
}
log(req.Context()).Debugf("%v %v took %v and returned %v", req.Method, req.URL, time.Since(t0), resp.Status)
return resp, nil
}

View File

@@ -0,0 +1,30 @@
// Package remoterepoapi contains requests and responses for remote repository API.
package remoterepoapi
import (
"encoding/json"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/repo/object"
)
// Parameters encapsulates all parameters for repository.
// returned by /api/v1/repo/parameters
type Parameters struct {
HashFunction string `json:"hash"`
HMACSecret []byte `json:"hmacSecret"`
object.Format
}
// GetHashFunction returns the name of the hash function for remote repository.
func (p *Parameters) GetHashFunction() string { return p.HashFunction }
// GetHMACSecret returns the HMAC secret for the remote repository.
func (p *Parameters) GetHMACSecret() []byte { return p.HMACSecret }
// ManifestWithMetadata represents manifest payload and metadata
type ManifestWithMetadata struct {
Payload json.RawMessage `json:"payload"`
Metadata *manifest.EntryMetadata `json:"metadata"`
}

View File

@@ -76,11 +76,13 @@ func (e *Environment) Setup(t *testing.T, opts ...func(*repo.NewRepositoryOption
e.connected = true
e.Repository, err = repo.Open(ctx, e.configFile(), masterPassword, &repo.Options{})
rep, err := repo.Open(ctx, e.configFile(), masterPassword, &repo.Options{})
if err != nil {
t.Fatalf("can't open: %v", err)
}
e.Repository = rep.(*repo.DirectRepository)
return e
}
@@ -117,10 +119,12 @@ func (e *Environment) MustReopen(t *testing.T) {
t.Fatalf("close error: %v", err)
}
e.Repository, err = repo.Open(testlogging.Context(t), e.configFile(), masterPassword, &repo.Options{})
rep, err := repo.Open(testlogging.Context(t), e.configFile(), masterPassword, &repo.Options{})
if err != nil {
t.Fatalf("err: %v", err)
}
e.Repository = rep.(*repo.DirectRepository)
}
// MustOpenAnother opens another repository backend by the same storage.

View File

@@ -24,11 +24,13 @@ func TestTimeFuncWiring(t *testing.T) {
ft := faketime.NewTimeAdvance(time.Date(2018, time.February, 6, 0, 0, 0, 0, time.UTC))
// Re open with injected time
r, err := repo.Open(ctx, env.Repository.ConfigFile, masterPassword, &repo.Options{TimeNowFunc: ft.NowFunc()})
rep, err := repo.Open(ctx, env.Repository.ConfigFile, masterPassword, &repo.Options{TimeNowFunc: ft.NowFunc()})
if err != nil {
t.Fatal("Failed to open repo:", err)
}
r := rep.(*repo.DirectRepository)
env.Repository = r
// verify wiring for the repo layer

View File

@@ -0,0 +1,76 @@
package server
import (
"context"
"io/ioutil"
"net/http"
"github.com/gorilla/mux"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
)
func (s *Server) handleContentGet(ctx context.Context, r *http.Request) (interface{}, *apiError) {
dr, ok := s.rep.(*repo.DirectRepository)
if !ok {
return nil, notFoundError("content not found")
}
cid := content.ID(mux.Vars(r)["contentID"])
data, err := dr.Content.GetContent(ctx, cid)
if err == content.ErrContentNotFound {
return nil, notFoundError("content not found")
}
return data, nil
}
func (s *Server) handleContentInfo(ctx context.Context, r *http.Request) (interface{}, *apiError) {
dr, ok := s.rep.(*repo.DirectRepository)
if !ok {
return nil, notFoundError("content not found")
}
cid := content.ID(mux.Vars(r)["contentID"])
ci, err := dr.Content.ContentInfo(ctx, cid)
switch err {
case nil:
return ci, nil
case content.ErrContentNotFound:
return nil, notFoundError("content not found")
default:
return nil, internalServerError(err)
}
}
func (s *Server) handleContentPut(ctx context.Context, r *http.Request) (interface{}, *apiError) {
dr, ok := s.rep.(*repo.DirectRepository)
if !ok {
return nil, notFoundError("content not found")
}
cid := content.ID(mux.Vars(r)["contentID"])
prefix := cid.Prefix()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
}
actualCID, err := dr.Content.WriteContent(ctx, data, prefix)
if err != nil {
return nil, internalServerError(err)
}
if actualCID != cid {
return nil, requestError(serverapi.ErrorMalformedRequest, "mismatched content ID")
}
return &serverapi.Empty{}, nil
}

View File

@@ -16,6 +16,10 @@ func requestError(apiErrorCode serverapi.APIErrorCode, message string) *apiError
return &apiError{400, apiErrorCode, message}
}
func notFoundError(message string) *apiError {
return &apiError{404, serverapi.ErrorNotFound, message}
}
func internalServerError(err error) *apiError {
return &apiError{500, serverapi.ErrorInternal, fmt.Sprintf("internal server error: %v", err)}
}

View File

@@ -0,0 +1,109 @@
package server
import (
"context"
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"github.com/kopia/kopia/internal/remoterepoapi"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo/manifest"
)
func (s *Server) handleManifestGet(ctx context.Context, r *http.Request) (interface{}, *apiError) {
// password already validated by a wrapper, no need to check here.
userAtHost, _, _ := r.BasicAuth()
mid := manifest.ID(mux.Vars(r)["manifestID"])
var data json.RawMessage
md, err := s.rep.GetManifest(ctx, mid, &data)
if err == manifest.ErrNotFound {
return nil, notFoundError("manifest not found")
}
if err != nil {
return nil, internalServerError(err)
}
if !manifestMatchesUser(md, userAtHost) {
return nil, notFoundError("manifest not found")
}
return &remoterepoapi.ManifestWithMetadata{
Payload: data,
Metadata: md,
}, nil
}
func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request) (interface{}, *apiError) {
mid := manifest.ID(mux.Vars(r)["manifestID"])
err := s.rep.DeleteManifest(ctx, mid)
if err == manifest.ErrNotFound {
return nil, notFoundError("manifest not found")
}
if err != nil {
return nil, internalServerError(err)
}
return &serverapi.Empty{}, nil
}
func (s *Server) handleManifestList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
// password already validated by a wrapper, no need to check here.
userAtHost, _, _ := r.BasicAuth()
labels := map[string]string{}
for k, v := range r.URL.Query() {
labels[k] = v[0]
}
m, err := s.rep.FindManifests(ctx, labels)
if err != nil {
return nil, internalServerError(err)
}
return filterManifests(m, userAtHost), nil
}
func manifestMatchesUser(m *manifest.EntryMetadata, userAtHost string) bool {
if userAtHost == "" {
return true
}
actualUser := m.Labels["username"] + "@" + m.Labels["hostname"]
return actualUser == userAtHost
}
func filterManifests(manifests []*manifest.EntryMetadata, userAtHost string) []*manifest.EntryMetadata {
result := []*manifest.EntryMetadata{}
for _, m := range manifests {
if manifestMatchesUser(m, userAtHost) {
result = append(result, m)
}
}
return result
}
func (s *Server) handleManifestCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
var req remoterepoapi.ManifestWithMetadata
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request")
}
id, err := s.rep.PutManifest(ctx, req.Metadata.Labels, req.Payload)
if err != nil {
return nil, internalServerError(err)
}
return &manifest.EntryMetadata{ID: id}, nil
}

View File

@@ -2,19 +2,15 @@
import (
"net/http"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/kopia/kopia/repo/object"
)
func (s *Server) handleObjectGet(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "incompatible HTTP method", http.StatusMethodNotAllowed)
return
}
oidstr := r.URL.Path[strings.LastIndex(r.URL.Path, "/")+1:]
oidstr := mux.Vars(r)["objectID"]
oid, err := object.ParseID(oidstr)
if err != nil {

View File

@@ -8,6 +8,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/remoterepoapi"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
@@ -19,6 +20,23 @@
"github.com/kopia/kopia/snapshot/policy"
)
func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request) (interface{}, *apiError) {
dr, ok := s.rep.(*repo.DirectRepository)
if !ok {
return &serverapi.StatusResponse{
Connected: false,
}, nil
}
rp := &remoterepoapi.Parameters{
HashFunction: dr.Content.Format.Hash,
HMACSecret: dr.Content.Format.HMACSecret,
Format: dr.Objects.Format,
}
return rp, nil
}
func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) {
if s.rep == nil {
return &serverapi.StatusResponse{

View File

@@ -45,32 +45,42 @@ func (s *Server) APIHandlers() http.Handler {
m := mux.NewRouter()
// sources
m.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesList)).Methods("GET")
m.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesCreate)).Methods("POST")
m.HandleFunc("/api/v1/sources/upload", s.handleAPI(s.handleUpload)).Methods("POST")
m.HandleFunc("/api/v1/sources/cancel", s.handleAPI(s.handleCancel)).Methods("POST")
m.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesList)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/sources", s.handleAPI(s.handleSourcesCreate)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/sources/upload", s.handleAPI(s.handleUpload)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/sources/cancel", s.handleAPI(s.handleCancel)).Methods(http.MethodPost)
// snapshots
m.HandleFunc("/api/v1/snapshots", s.handleAPI(s.handleSnapshotList)).Methods("GET")
m.HandleFunc("/api/v1/snapshots", s.handleAPI(s.handleSnapshotList)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyGet)).Methods("GET")
m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyPut)).Methods("PUT")
m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyDelete)).Methods("DELETE")
m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyGet)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyPut)).Methods(http.MethodPut)
m.HandleFunc("/api/v1/policy", s.handleAPI(s.handlePolicyDelete)).Methods(http.MethodDelete)
m.HandleFunc("/api/v1/policies", s.handleAPI(s.handlePolicyList)).Methods("GET")
m.HandleFunc("/api/v1/policies", s.handleAPI(s.handlePolicyList)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh)).Methods("POST")
m.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush)).Methods("POST")
m.HandleFunc("/api/v1/shutdown", s.handleAPIPossiblyNotConnected(s.handleShutdown)).Methods("POST")
m.HandleFunc("/api/v1/refresh", s.handleAPI(s.handleRefresh)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/flush", s.handleAPI(s.handleFlush)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/shutdown", s.handleAPIPossiblyNotConnected(s.handleShutdown)).Methods(http.MethodPost)
m.PathPrefix("/api/v1/objects/").HandlerFunc(s.handleObjectGet).Methods("GET")
m.HandleFunc("/api/v1/objects/{objectID}", s.handleObjectGet).Methods(http.MethodGet)
m.HandleFunc("/api/v1/repo/status", s.handleAPIPossiblyNotConnected(s.handleRepoStatus)).Methods("GET")
m.HandleFunc("/api/v1/repo/connect", s.handleAPIPossiblyNotConnected(s.handleRepoConnect)).Methods("POST")
m.HandleFunc("/api/v1/repo/create", s.handleAPIPossiblyNotConnected(s.handleRepoCreate)).Methods("POST")
m.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect)).Methods("POST")
m.HandleFunc("/api/v1/repo/algorithms", s.handleAPIPossiblyNotConnected(s.handleRepoSupportedAlgorithms)).Methods("GET")
m.HandleFunc("/api/v1/repo/sync", s.handleAPI(s.handleRepoSync)).Methods("POST")
m.HandleFunc("/api/v1/repo/status", s.handleAPIPossiblyNotConnected(s.handleRepoStatus)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/repo/connect", s.handleAPIPossiblyNotConnected(s.handleRepoConnect)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/repo/create", s.handleAPIPossiblyNotConnected(s.handleRepoCreate)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/repo/disconnect", s.handleAPI(s.handleRepoDisconnect)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/repo/algorithms", s.handleAPIPossiblyNotConnected(s.handleRepoSupportedAlgorithms)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/repo/sync", s.handleAPI(s.handleRepoSync)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/repo/parameters", s.handleAPI(s.handleRepoParameters)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/contents/{contentID}", s.handleAPI(s.handleContentInfo)).Methods(http.MethodGet).Queries("info", "1")
m.HandleFunc("/api/v1/contents/{contentID}", s.handleAPI(s.handleContentGet)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/contents/{contentID}", s.handleAPI(s.handleContentPut)).Methods(http.MethodPut)
m.HandleFunc("/api/v1/manifests/{manifestID}", s.handleAPI(s.handleManifestGet)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/manifests/{manifestID}", s.handleAPI(s.handleManifestDelete)).Methods(http.MethodDelete)
m.HandleFunc("/api/v1/manifests", s.handleAPI(s.handleManifestCreate)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/manifests", s.handleAPI(s.handleManifestList)).Methods(http.MethodGet)
return m
}
@@ -101,7 +111,11 @@ func (s *Server) handleAPIPossiblyNotConnected(f func(ctx context.Context, r *ht
v, err := f(ctx, r)
if err == nil {
if err := e.Encode(v); err != nil {
if b, ok := v.([]byte); ok {
if _, err := w.Write(b); err != nil {
log(ctx).Warningf("error writing response: %v", err)
}
} else if err := e.Encode(v); err != nil {
log(ctx).Warningf("error encoding response: %v", err)
}

View File

@@ -1,166 +0,0 @@
package serverapi
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/json"
"net/http"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/logging"
)
var log = logging.GetContextLoggerFunc("kopia/client")
// DefaultUsername is the default username for Kopia server.
const DefaultUsername = "kopia"
// Client provides helper methods for communicating with Kopia API serevr.
type Client struct {
options ClientOptions
}
// HTTPClient returns HTTP client that connects to the server.
func (c *Client) HTTPClient() *http.Client {
return c.options.HTTPClient
}
// Get sends HTTP GET request and decodes the JSON response into the provided payload structure.
func (c *Client) Get(ctx context.Context, path string, respPayload interface{}) error {
req, err := http.NewRequest("GET", c.options.BaseURL+path, nil)
if err != nil {
return err
}
if c.options.LogRequests {
log(ctx).Debugf("GET %v", c.options.BaseURL+path)
}
if c.options.Username != "" {
req.SetBasicAuth(c.options.Username, c.options.Password)
}
resp, err := c.options.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
return errors.Errorf("invalid server response: %v", resp.Status)
}
if err := json.NewDecoder(resp.Body).Decode(respPayload); err != nil {
return errors.Wrap(err, "malformed server response")
}
return nil
}
// Post sends HTTP post request with given JSON payload structure and decodes the JSON response into another payload structure.
func (c *Client) Post(ctx context.Context, path string, reqPayload, respPayload interface{}) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(reqPayload); err != nil {
return errors.Wrap(err, "unable to encode request")
}
if c.options.LogRequests {
log(ctx).Infof("POST %v (%v bytes)", c.options.BaseURL+path, buf.Len())
}
req, err := http.NewRequest("POST", c.options.BaseURL+path, &buf)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
if c.options.Username != "" {
req.SetBasicAuth(c.options.Username, c.options.Password)
}
resp, err := c.options.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
return errors.Errorf("invalid server response: %v", resp.Status)
}
if err := json.NewDecoder(resp.Body).Decode(respPayload); err != nil {
return errors.Wrap(err, "malformed server response")
}
return nil
}
// ClientOptions encapsulates all optional API options.HTTPClient options.
type ClientOptions struct {
BaseURL string
HTTPClient *http.Client
Username string
Password string
TrustedServerCertificateFingerprint string
RootCAs *x509.CertPool
LogRequests bool
}
// NewClient creates a options.HTTPClient for connecting to Kopia HTTP API.
// nolint:gocritic
func NewClient(options ClientOptions) (*Client, error) {
if options.HTTPClient == nil {
transport := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: options.RootCAs,
},
}
if f := options.TrustedServerCertificateFingerprint; f != "" {
if options.RootCAs != nil {
return nil, errors.Errorf("can't set both RootCAs and TrustedServerCertificateFingerprint")
}
transport.TLSClientConfig.InsecureSkipVerify = true
transport.TLSClientConfig.VerifyPeerCertificate = verifyPeerCertificate(f)
}
options.HTTPClient = &http.Client{
Transport: transport,
}
}
if options.Username == "" {
options.Username = DefaultUsername
}
options.BaseURL += "/api/v1/"
return &Client{options}, nil
}
func verifyPeerCertificate(sha256Fingerprint string) func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
return func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
for _, c := range rawCerts {
h := sha256.Sum256(c)
if hex.EncodeToString(h[:]) == sha256Fingerprint {
return nil
}
}
return errors.Errorf("can't find certificate matching SHA256 fingerprint %q", sha256Fingerprint)
}
}

View File

@@ -2,17 +2,15 @@
import (
"context"
"io/ioutil"
"net/http"
"strings"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
)
// CreateSnapshotSource creates snapshot source with a given path.
func (c *Client) CreateSnapshotSource(ctx context.Context, req *CreateSnapshotSourceRequest) (*CreateSnapshotSourceResponse, error) {
func CreateSnapshotSource(ctx context.Context, c *apiclient.KopiaAPIClient, req *CreateSnapshotSourceRequest) (*CreateSnapshotSourceResponse, error) {
resp := &CreateSnapshotSourceResponse{}
if err := c.Post(ctx, "sources", req, resp); err != nil {
return nil, err
@@ -22,7 +20,7 @@ func (c *Client) CreateSnapshotSource(ctx context.Context, req *CreateSnapshotSo
}
// UploadSnapshots triggers snapshot upload on matching snapshots.
func (c *Client) UploadSnapshots(ctx context.Context, match *snapshot.SourceInfo) (*MultipleSourceActionResponse, error) {
func UploadSnapshots(ctx context.Context, c *apiclient.KopiaAPIClient, match *snapshot.SourceInfo) (*MultipleSourceActionResponse, error) {
resp := &MultipleSourceActionResponse{}
if err := c.Post(ctx, "sources/upload"+matchSourceParameters(match), &Empty{}, resp); err != nil {
return nil, err
@@ -32,7 +30,7 @@ func (c *Client) UploadSnapshots(ctx context.Context, match *snapshot.SourceInfo
}
// CancelUpload cancels snapshot upload on matching snapshots.
func (c *Client) CancelUpload(ctx context.Context, match *snapshot.SourceInfo) (*MultipleSourceActionResponse, error) {
func CancelUpload(ctx context.Context, c *apiclient.KopiaAPIClient, match *snapshot.SourceInfo) (*MultipleSourceActionResponse, error) {
resp := &MultipleSourceActionResponse{}
if err := c.Post(ctx, "sources/cancel"+matchSourceParameters(match), &Empty{}, resp); err != nil {
return nil, err
@@ -42,29 +40,29 @@ func (c *Client) CancelUpload(ctx context.Context, match *snapshot.SourceInfo) (
}
// CreateRepository invokes the 'repo/create' API.
func (c *Client) CreateRepository(ctx context.Context, req *CreateRepositoryRequest) error {
func CreateRepository(ctx context.Context, c *apiclient.KopiaAPIClient, req *CreateRepositoryRequest) error {
return c.Post(ctx, "repo/create", req, &StatusResponse{})
}
// ConnectToRepository invokes the 'repo/connect' API.
func (c *Client) ConnectToRepository(ctx context.Context, req *ConnectRepositoryRequest) error {
func ConnectToRepository(ctx context.Context, c *apiclient.KopiaAPIClient, req *ConnectRepositoryRequest) error {
return c.Post(ctx, "repo/connect", req, &StatusResponse{})
}
// DisconnectFromRepository invokes the 'repo/disconnect' API.
func (c *Client) DisconnectFromRepository(ctx context.Context) error {
func DisconnectFromRepository(ctx context.Context, c *apiclient.KopiaAPIClient) error {
return c.Post(ctx, "repo/disconnect", &Empty{}, &Empty{})
}
// Shutdown invokes the 'repo/shutdown' API.
func (c *Client) Shutdown(ctx context.Context) {
func Shutdown(ctx context.Context, c *apiclient.KopiaAPIClient) {
_ = c.Post(ctx, "shutdown", &Empty{}, &Empty{})
}
// Status invokes the 'repo/status' API.
func (c *Client) Status(ctx context.Context) (*StatusResponse, error) {
func Status(ctx context.Context, c *apiclient.KopiaAPIClient) (*StatusResponse, error) {
resp := &StatusResponse{}
if err := c.Get(ctx, "repo/status", resp); err != nil {
if err := c.Get(ctx, "repo/status", nil, resp); err != nil {
return nil, err
}
@@ -72,9 +70,9 @@ func (c *Client) Status(ctx context.Context) (*StatusResponse, error) {
}
// ListSources lists the snapshot sources managed by the server.
func (c *Client) ListSources(ctx context.Context, match *snapshot.SourceInfo) (*SourcesResponse, error) {
func ListSources(ctx context.Context, c *apiclient.KopiaAPIClient, match *snapshot.SourceInfo) (*SourcesResponse, error) {
resp := &SourcesResponse{}
if err := c.Get(ctx, "sources"+matchSourceParameters(match), resp); err != nil {
if err := c.Get(ctx, "sources"+matchSourceParameters(match), nil, resp); err != nil {
return nil, err
}
@@ -82,9 +80,9 @@ func (c *Client) ListSources(ctx context.Context, match *snapshot.SourceInfo) (*
}
// ListSnapshots lists the snapshots managed by the server for a given source filter.
func (c *Client) ListSnapshots(ctx context.Context, match *snapshot.SourceInfo) (*SnapshotsResponse, error) {
func ListSnapshots(ctx context.Context, c *apiclient.KopiaAPIClient, match *snapshot.SourceInfo) (*SnapshotsResponse, error) {
resp := &SnapshotsResponse{}
if err := c.Get(ctx, "snapshots"+matchSourceParameters(match), resp); err != nil {
if err := c.Get(ctx, "snapshots"+matchSourceParameters(match), nil, resp); err != nil {
return nil, err
}
@@ -92,9 +90,9 @@ func (c *Client) ListSnapshots(ctx context.Context, match *snapshot.SourceInfo)
}
// ListPolicies lists the policies managed by the server for a given target filter.
func (c *Client) ListPolicies(ctx context.Context, match *snapshot.SourceInfo) (*PoliciesResponse, error) {
func ListPolicies(ctx context.Context, c *apiclient.KopiaAPIClient, match *snapshot.SourceInfo) (*PoliciesResponse, error) {
resp := &PoliciesResponse{}
if err := c.Get(ctx, "policies"+matchSourceParameters(match), resp); err != nil {
if err := c.Get(ctx, "policies"+matchSourceParameters(match), nil, resp); err != nil {
return nil, err
}
@@ -102,31 +100,14 @@ func (c *Client) ListPolicies(ctx context.Context, match *snapshot.SourceInfo) (
}
// GetObject returns the object payload.
func (c *Client) GetObject(ctx context.Context, objectID string) ([]byte, error) {
req, err := http.NewRequest("GET", c.options.BaseURL+"objects/"+objectID, nil)
if err != nil {
func GetObject(ctx context.Context, c *apiclient.KopiaAPIClient, objectID string) ([]byte, error) {
var b []byte
if err := c.Get(ctx, "objects/"+objectID, object.ErrObjectNotFound, &b); err != nil {
return nil, err
}
if c.options.LogRequests {
log(ctx).Debugf("GET %v", req.URL)
}
if c.options.Username != "" {
req.SetBasicAuth(c.options.Username, c.options.Password)
}
resp, err := c.options.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("invalid server response: %v", resp.Status)
}
return ioutil.ReadAll(resp.Body)
return b, nil
}
func matchSourceParameters(match *snapshot.SourceInfo) string {

View File

@@ -0,0 +1,243 @@
package repo
import (
"context"
"encoding/hex"
"encoding/json"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/remoterepoapi"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/repo/object"
)
// APIServerInfo is remote repository configuration stored in local configuration.
type APIServerInfo struct {
BaseURL string `json:"url"`
TrustedServerCertificateFingerprint string `json:"serverCertFingerprint"`
}
// remoteRepository is an implementation of Repository that connects to an instance of
// API server hosted by `kopia server`, instead of directly manipulating files in the BLOB storage.
type apiServerRepository struct {
cli *apiclient.KopiaAPIClient
h hashing.HashFunc
omgr *object.Manager
username string
hostname string
}
func (r *apiServerRepository) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) {
return r.omgr.Open(ctx, id)
}
func (r *apiServerRepository) NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer {
return r.omgr.NewWriter(ctx, opt)
}
func (r *apiServerRepository) VerifyObject(ctx context.Context, id object.ID) ([]content.ID, error) {
return r.omgr.VerifyObject(ctx, id)
}
func (r *apiServerRepository) GetManifest(ctx context.Context, id manifest.ID, data interface{}) (*manifest.EntryMetadata, error) {
var mm remoterepoapi.ManifestWithMetadata
if err := r.cli.Get(ctx, "manifests/"+string(id), manifest.ErrNotFound, &mm); err != nil {
return nil, err
}
return mm.Metadata, json.Unmarshal(mm.Payload, data)
}
func (r *apiServerRepository) PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) {
v, err := json.Marshal(payload)
if err != nil {
return "", errors.Wrap(err, "unable to marshal JSON")
}
req := &remoterepoapi.ManifestWithMetadata{
Payload: json.RawMessage(v),
Metadata: &manifest.EntryMetadata{
Labels: labels,
},
}
resp := &manifest.EntryMetadata{}
if err := r.cli.Post(ctx, "manifests", req, resp); err != nil {
return "", err
}
return resp.ID, nil
}
func (r *apiServerRepository) FindManifests(ctx context.Context, labels map[string]string) ([]*manifest.EntryMetadata, error) {
uv := make(url.Values)
for k, v := range labels {
uv.Add(k, v)
}
var mm []*manifest.EntryMetadata
if err := r.cli.Get(ctx, "manifests?"+uv.Encode(), nil, &mm); err != nil {
return nil, err
}
return mm, nil
}
func (r *apiServerRepository) DeleteManifest(ctx context.Context, id manifest.ID) error {
return r.cli.Delete(ctx, "manifests/"+string(id), nil, nil)
}
func (r *apiServerRepository) Hostname() string {
return r.hostname
}
func (r *apiServerRepository) Username() string {
return r.username
}
func (r *apiServerRepository) Time() time.Time {
return time.Now() // allow:no-inject-time
}
func (r *apiServerRepository) Refresh(ctx context.Context) error {
return nil
}
func (r *apiServerRepository) Flush(ctx context.Context) error {
return nil
}
func (r *apiServerRepository) Close(ctx context.Context) error {
if err := r.omgr.Close(); err != nil {
return errors.Wrap(err, "error closing object manager")
}
return nil
}
func (r *apiServerRepository) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) {
var bi content.Info
if err := r.cli.Get(ctx, "contents/"+string(contentID)+"?info=1", content.ErrContentNotFound, &bi); err != nil {
return content.Info{}, err
}
return bi, nil
}
func (r *apiServerRepository) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) {
var result []byte
if err := r.cli.Get(ctx, "contents/"+string(contentID), content.ErrContentNotFound, &result); err != nil {
return nil, err
}
return result, nil
}
func (r *apiServerRepository) WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error) {
if err := content.ValidatePrefix(prefix); err != nil {
return "", err
}
var hashOutput [128]byte
contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], data)))
if err := r.cli.Put(ctx, "contents/"+string(contentID), data, nil); err != nil {
return "", err
}
return contentID, nil
}
var _ Repository = (*apiServerRepository)(nil)
// openAPIServer connects remote repository over Kopia API.
func openAPIServer(ctx context.Context, si *APIServerInfo, username, hostname, password string) (Repository, error) {
cli, err := apiclient.NewKopiaAPIClient(apiclient.Options{
BaseURL: si.BaseURL,
TrustedServerCertificateFingerprint: si.TrustedServerCertificateFingerprint,
Username: username + "@" + hostname,
Password: password,
LogRequests: true,
})
if err != nil {
return nil, errors.Wrap(err, "unable to create API client")
}
rr := &apiServerRepository{
cli: cli,
username: username,
hostname: hostname,
}
var p remoterepoapi.Parameters
if err = cli.Get(ctx, "repo/parameters", nil, &p); err != nil {
return nil, errors.Wrap(err, "unable to get repository parameters")
}
hf, err := hashing.CreateHashFunc(&p)
if err != nil {
return nil, errors.Wrap(err, "unable to create hash function")
}
rr.h = hf
// create object manager using rr as contentManager implementation.
omgr, err := object.NewObjectManager(ctx, rr, p.Format, object.ManagerOptions{})
if err != nil {
return nil, errors.Wrap(err, "error initializing object manager")
}
rr.omgr = omgr
return rr, nil
}
// ConnectAPIServer sets up repository connection to a particular API server.
func ConnectAPIServer(ctx context.Context, configFile string, si *APIServerInfo, password string, opt *ConnectOptions) error {
lc := LocalConfig{
APIServer: si,
Hostname: opt.HostnameOverride,
Username: opt.UsernameOverride,
}
if lc.Hostname == "" {
lc.Hostname = getDefaultHostName(ctx)
}
if lc.Username == "" {
lc.Username = getDefaultUserName(ctx)
}
d, err := json.MarshalIndent(&lc, "", " ")
if err != nil {
return err
}
if err = os.MkdirAll(filepath.Dir(configFile), 0700); err != nil {
return errors.Wrap(err, "unable to create config directory")
}
if err = ioutil.WriteFile(configFile, d, 0600); err != nil {
return errors.Wrap(err, "unable to write config file")
}
return verifyConnect(ctx, configFile, password, opt.PersistCredentials)
}

View File

@@ -49,7 +49,9 @@ func Connect(ctx context.Context, configFile string, st blob.Storage, password s
}
var lc LocalConfig
lc.Storage = st.ConnectionInfo()
ci := st.ConnectionInfo()
lc.Storage = &ci
lc.Hostname = opt.HostnameOverride
if lc.Hostname == "" {
@@ -78,6 +80,10 @@ func Connect(ctx context.Context, configFile string, st blob.Storage, password s
return errors.Wrap(err, "unable to write config file")
}
return verifyConnect(ctx, configFile, password, opt.PersistCredentials)
}
func verifyConnect(ctx context.Context, configFile, password string, persist bool) error {
// now verify that the repository can be opened with the provided config file.
r, err := Open(ctx, configFile, password, nil)
if err != nil {
@@ -90,7 +96,7 @@ func Connect(ctx context.Context, configFile string, st blob.Storage, password s
return err
}
if opt.PersistCredentials {
if persist {
if err := persistPassword(ctx, configFile, password); err != nil {
return errors.Wrap(err, "unable to persist password")
}
@@ -103,10 +109,14 @@ func Connect(ctx context.Context, configFile string, st blob.Storage, password s
func setupCaching(ctx context.Context, configPath string, lc *LocalConfig, opt content.CachingOptions, uniqueID []byte) error {
if opt.MaxCacheSizeBytes == 0 {
lc.Caching = content.CachingOptions{}
lc.Caching = &content.CachingOptions{}
return nil
}
if lc.Caching == nil {
lc.Caching = &content.CachingOptions{}
}
if opt.CacheDirectory == "" {
cacheDir, err := os.UserCacheDir()
if err != nil {
@@ -155,7 +165,7 @@ func Disconnect(ctx context.Context, configFile string) error {
deletePassword(ctx, configFile)
if cfg.Caching.CacheDirectory != "" {
if cfg.Caching != nil && cfg.Caching.CacheDirectory != "" {
if err = os.RemoveAll(cfg.Caching.CacheDirectory); err != nil {
log(ctx).Warningf("unable to remove cache directory: %v", err)
}

View File

@@ -529,7 +529,7 @@ func (bm *Manager) WriteContent(ctx context.Context, data []byte, prefix ID) (ID
stats.Record(ctx, metricContentWriteContentCount.M(1))
stats.Record(ctx, metricContentWriteContentBytes.M(int64(len(data))))
if err := validatePrefix(prefix); err != nil {
if err := ValidatePrefix(prefix); err != nil {
return "", err
}

View File

@@ -202,7 +202,8 @@ func (bm *lockFreeManager) unprocessedIndexBlobsUnlocked(ctx context.Context, co
return ch, totalSize, nil
}
func validatePrefix(prefix ID) error {
// ValidatePrefix returns an error if a given prefix is invalid.
func ValidatePrefix(prefix ID) error {
switch len(prefix) {
case 0:
return nil

View File

@@ -12,10 +12,16 @@
// LocalConfig is a configuration of Kopia stored in a configuration file.
type LocalConfig struct {
Storage blob.ConnectionInfo `json:"storage"`
Caching content.CachingOptions `json:"caching"`
Hostname string `json:"hostname"`
Username string `json:"username"`
// APIServer is only provided for remote repository.
APIServer *APIServerInfo `json:"apiServer,omitempty"`
// Storage is only provided for direct repository access.
Storage *blob.ConnectionInfo `json:"storage,omitempty"`
Caching *content.CachingOptions `json:"caching,omitempty"`
Hostname string `json:"hostname"`
Username string `json:"username"`
}
// repositoryObjectFormat describes the format of objects in a repository.

View File

@@ -5,8 +5,8 @@
// EntryMetadata contains metadata about manifest item. Each manifest item has one or more labels
// Including required "type" label.
type EntryMetadata struct {
ID ID
Length int
Labels map[string]string
ModTime time.Time
ID ID `json:"id"`
Length int `json:"length"`
Labels map[string]string `json:"labels"`
ModTime time.Time `json:"mtime"`
}

View File

@@ -32,7 +32,7 @@ type Options struct {
var ErrInvalidPassword = errors.Errorf("invalid repository password")
// Open opens a Repository specified in the configuration file.
func Open(ctx context.Context, configFile, password string, options *Options) (rep *DirectRepository, err error) {
func Open(ctx context.Context, configFile, password string, options *Options) (rep Repository, err error) {
defer func() {
if err != nil {
log(ctx).Errorf("failed to open repository: %v", err)
@@ -53,11 +53,24 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r
return nil, err
}
if lc.APIServer != nil {
return openAPIServer(ctx, lc.APIServer, lc.Username, lc.Hostname, password)
}
return openDirect(ctx, configFile, lc, password, options)
}
// openDirect opens the repository that directly manipulates blob storage..
func openDirect(ctx context.Context, configFile string, lc *LocalConfig, password string, options *Options) (rep *DirectRepository, err error) {
if lc.Caching.CacheDirectory != "" && !filepath.IsAbs(lc.Caching.CacheDirectory) {
lc.Caching.CacheDirectory = filepath.Join(filepath.Dir(configFile), lc.Caching.CacheDirectory)
}
st, err := blob.NewStorage(ctx, lc.Storage)
if lc.Storage == nil {
return nil, errors.Errorf("storage not set in the configuration file")
}
st, err := blob.NewStorage(ctx, *lc.Storage)
if err != nil {
return nil, errors.Wrap(err, "cannot open storage")
}
@@ -66,7 +79,7 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r
st = loggingwrapper.NewWrapper(st, options.TraceStorage, "[STORAGE] ")
}
r, err := OpenWithConfig(ctx, st, lc, password, options, lc.Caching)
r, err := OpenWithConfig(ctx, st, lc, password, options, *lc.Caching)
if err != nil {
st.Close(ctx) //nolint:errcheck
return nil, err

View File

@@ -0,0 +1,92 @@
package endtoend_test
import (
"io/ioutil"
"path/filepath"
"testing"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/tests/testenv"
)
// foo@bar - password baz
var htpasswdFileContents = []byte("foo@bar:$2y$05$JWrExvBe5Knh0.AMLk5WHu.EzfOP.LhrqMIRf1YseZ/rulBjKqGJ.\n")
func TestAPIServerRepository(t *testing.T) {
ctx := testlogging.Context(t)
e := testenv.NewCLITest(t)
defer e.Cleanup(t)
defer e.RunAndExpectSuccess(t, "repo", "disconnect")
// create one snapshot as foo@bar
e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir, "--override-username", "foo", "--override-hostname", "bar")
e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1)
e1 := testenv.NewCLITest(t)
defer e1.Cleanup(t)
defer e1.RunAndExpectSuccess(t, "repo", "disconnect")
// create one snapshot as not-foo@bar
e1.RunAndExpectSuccess(t, "repo", "connect", "filesystem", "--path", e.RepoDir, "--override-username", "not-foo", "--override-hostname", "bar")
e1.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1)
htpasswordFile := filepath.Join(e.ConfigDir, "htpasswd.txt")
ioutil.WriteFile(htpasswordFile, htpasswdFileContents, 0755)
var sp serverParameters
e.RunAndProcessStderr(t, sp.ProcessOutput,
"server", "start",
"--address=localhost:0",
"--random-password",
"--tls-generate-cert",
"--auto-shutdown=60s",
"--htpasswd-file", htpasswordFile,
)
t.Logf("detected server parameters %#v", sp)
cli, err := apiclient.NewKopiaAPIClient(apiclient.Options{
BaseURL: sp.baseURL,
Username: "foo@bar",
Password: "baz",
TrustedServerCertificateFingerprint: sp.sha256Fingerprint,
LogRequests: true,
})
if err != nil {
t.Fatalf("unable to create API apiclient")
}
defer serverapi.Shutdown(ctx, cli) // nolint:errcheck
waitUntilServerStarted(ctx, t, cli)
e2 := testenv.NewCLITest(t)
defer e2.Cleanup(t)
defer e2.RunAndExpectSuccess(t, "repo", "disconnect")
e2.RunAndExpectSuccess(t, "repo", "connect", "server",
"--url", sp.baseURL,
"--server-cert-fingerprint", sp.sha256Fingerprint,
"--override-username", "foo",
"--override-hostname", "bar",
"--password", "baz",
)
// should see one snapshot
snapshots := e2.ListSnapshotsAndExpectSuccess(t)
if got, want := len(snapshots), 1; got != want {
t.Errorf("invalid number of snapshots for foo@bar")
}
// create snapshot using remote repository client
e2.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir2)
// now should see two snapshots
snapshots = e2.ListSnapshotsAndExpectSuccess(t)
if got, want := len(snapshots), 2; got != want {
t.Errorf("invalid number of snapshots for foo@bar")
}
}

View File

@@ -12,6 +12,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/retry"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/internal/testlogging"
@@ -79,17 +80,18 @@ func TestServerStart(t *testing.T) {
)
t.Logf("detected server parameters %#v", sp)
cli, err := serverapi.NewClient(serverapi.ClientOptions{
cli, err := apiclient.NewKopiaAPIClient(apiclient.Options{
BaseURL: sp.baseURL,
Username: "kopia",
Password: sp.password,
TrustedServerCertificateFingerprint: sp.sha256Fingerprint,
LogRequests: true,
})
if err != nil {
t.Fatalf("unable to create API client")
t.Fatalf("unable to create API apiclient")
}
defer cli.Shutdown(ctx) // nolint:errcheck
defer serverapi.Shutdown(ctx, cli) // nolint:errcheck
waitUntilServerStarted(ctx, t, cli)
verifyUIServedWithCorrectTitle(t, cli, sp)
@@ -104,7 +106,7 @@ func TestServerStart(t *testing.T) {
t.Errorf("unexpected source path: %v, want %v", got, want)
}
createResp, err := cli.CreateSnapshotSource(ctx, &serverapi.CreateSnapshotSourceRequest{
createResp, err := serverapi.CreateSnapshotSource(ctx, cli, &serverapi.CreateSnapshotSourceRequest{
Path: sharedTestDataDir2,
})
@@ -132,13 +134,13 @@ func TestServerStart(t *testing.T) {
uploadMatchingSnapshots(t, cli, &snapshot.SourceInfo{Host: "fake-hostname", UserName: "fake-username", Path: sharedTestDataDir2})
waitForSnapshotCount(ctx, t, cli, &snapshot.SourceInfo{Host: "fake-hostname", UserName: "fake-username", Path: sharedTestDataDir2}, 1)
if _, err = cli.CancelUpload(ctx, nil); err != nil {
if _, err = serverapi.CancelUpload(ctx, cli, nil); err != nil {
t.Fatalf("cancel failed: %v", err)
}
snaps := verifySnapshotCount(t, cli, &snapshot.SourceInfo{Host: "fake-hostname", UserName: "fake-username", Path: sharedTestDataDir2}, 1)
rootPayload, err := cli.GetObject(ctx, snaps[0].RootEntry)
rootPayload, err := serverapi.GetObject(ctx, cli, snaps[0].RootEntry)
if err != nil {
t.Fatalf("getObject %v", err)
}
@@ -151,7 +153,7 @@ func TestServerStart(t *testing.T) {
keepDaily := 77
createResp, err = cli.CreateSnapshotSource(ctx, &serverapi.CreateSnapshotSourceRequest{
createResp, err = serverapi.CreateSnapshotSource(ctx, cli, &serverapi.CreateSnapshotSourceRequest{
Path: sharedTestDataDir3,
CreateSnapshot: true,
InitialPolicy: policy.Policy{
@@ -169,7 +171,7 @@ func TestServerStart(t *testing.T) {
t.Errorf("unexpected value of 'snapshotStarted': %v", createResp.SnapshotStarted)
}
policies, err := cli.ListPolicies(ctx, &snapshot.SourceInfo{Host: "fake-hostname", UserName: "fake-username", Path: sharedTestDataDir3})
policies, err := serverapi.ListPolicies(ctx, cli, &snapshot.SourceInfo{Host: "fake-hostname", UserName: "fake-username", Path: sharedTestDataDir3})
if err != nil {
t.Errorf("aaa")
}
@@ -206,21 +208,22 @@ func TestServerStartWithoutInitialRepository(t *testing.T) {
e.RunAndProcessStderr(t, sp.ProcessOutput, "server", "start", "--ui", "--address=localhost:0", "--random-password", "--tls-generate-cert", "--auto-shutdown=60s")
t.Logf("detected server parameters %#v", sp)
cli, err := serverapi.NewClient(serverapi.ClientOptions{
cli, err := apiclient.NewKopiaAPIClient(apiclient.Options{
BaseURL: sp.baseURL,
Username: "kopia",
Password: sp.password,
TrustedServerCertificateFingerprint: sp.sha256Fingerprint,
})
if err != nil {
t.Fatalf("unable to create API client")
t.Fatalf("unable to create API apiclient")
}
defer cli.Shutdown(ctx) // nolint:errcheck
defer serverapi.Shutdown(ctx, cli) // nolint:errcheck
waitUntilServerStarted(ctx, t, cli)
verifyServerConnected(t, cli, false)
if err = cli.CreateRepository(ctx, &serverapi.CreateRepositoryRequest{
if err = serverapi.CreateRepository(ctx, cli, &serverapi.CreateRepositoryRequest{
ConnectRepositoryRequest: serverapi.ConnectRepositoryRequest{
Password: "foofoo",
Storage: connInfo,
@@ -231,13 +234,13 @@ func TestServerStartWithoutInitialRepository(t *testing.T) {
verifyServerConnected(t, cli, true)
if err = cli.DisconnectFromRepository(ctx); err != nil {
if err = serverapi.DisconnectFromRepository(ctx, cli); err != nil {
t.Fatalf("disconnect error: %v", err)
}
verifyServerConnected(t, cli, false)
if err = cli.ConnectToRepository(ctx, &serverapi.ConnectRepositoryRequest{
if err = serverapi.ConnectToRepository(ctx, cli, &serverapi.ConnectRepositoryRequest{
Password: "foofoo",
Storage: connInfo,
}); err != nil {
@@ -247,10 +250,10 @@ func TestServerStartWithoutInitialRepository(t *testing.T) {
verifyServerConnected(t, cli, true)
}
func verifyServerConnected(t *testing.T, cli *serverapi.Client, want bool) *serverapi.StatusResponse {
func verifyServerConnected(t *testing.T, cli *apiclient.KopiaAPIClient, want bool) *serverapi.StatusResponse {
t.Helper()
st, err := cli.Status(testlogging.Context(t))
st, err := serverapi.Status(testlogging.Context(t), cli)
if err != nil {
t.Fatalf("status error: %v", err)
}
@@ -262,11 +265,11 @@ func verifyServerConnected(t *testing.T, cli *serverapi.Client, want bool) *serv
return st
}
func waitForSnapshotCount(ctx context.Context, t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo, want int) {
func waitForSnapshotCount(ctx context.Context, t *testing.T, cli *apiclient.KopiaAPIClient, match *snapshot.SourceInfo, want int) {
t.Helper()
err := retry.PeriodicallyNoValue(ctx, 1*time.Second, 60, "wait for snapshots", func() error {
snapshots, err := cli.ListSnapshots(testlogging.Context(t), match)
snapshots, err := serverapi.ListSnapshots(testlogging.Context(t), cli, match)
if err != nil {
return errors.Wrap(err, "error listing sources")
}
@@ -282,18 +285,18 @@ func waitForSnapshotCount(ctx context.Context, t *testing.T, cli *serverapi.Clie
}
}
func uploadMatchingSnapshots(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo) {
func uploadMatchingSnapshots(t *testing.T, cli *apiclient.KopiaAPIClient, match *snapshot.SourceInfo) {
t.Helper()
if _, err := cli.UploadSnapshots(testlogging.Context(t), match); err != nil {
if _, err := serverapi.UploadSnapshots(testlogging.Context(t), cli, match); err != nil {
t.Fatalf("upload failed: %v", err)
}
}
func verifySnapshotCount(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo, want int) []*serverapi.Snapshot {
func verifySnapshotCount(t *testing.T, cli *apiclient.KopiaAPIClient, match *snapshot.SourceInfo, want int) []*serverapi.Snapshot {
t.Helper()
snapshots, err := cli.ListSnapshots(testlogging.Context(t), match)
snapshots, err := serverapi.ListSnapshots(testlogging.Context(t), cli, match)
if err != nil {
t.Fatalf("error listing sources: %v", err)
}
@@ -305,10 +308,10 @@ func verifySnapshotCount(t *testing.T, cli *serverapi.Client, match *snapshot.So
return snapshots.Snapshots
}
func verifySourceCount(t *testing.T, cli *serverapi.Client, match *snapshot.SourceInfo, want int) []*serverapi.SourceStatus {
func verifySourceCount(t *testing.T, cli *apiclient.KopiaAPIClient, match *snapshot.SourceInfo, want int) []*serverapi.SourceStatus {
t.Helper()
sources, err := cli.ListSources(testlogging.Context(t), match)
sources, err := serverapi.ListSources(testlogging.Context(t), cli, match)
if err != nil {
t.Fatalf("error listing sources: %v", err)
}
@@ -328,7 +331,7 @@ func verifySourceCount(t *testing.T, cli *serverapi.Client, match *snapshot.Sour
return sources.Sources
}
func verifyUIServedWithCorrectTitle(t *testing.T, cli *serverapi.Client, sp serverParameters) {
func verifyUIServedWithCorrectTitle(t *testing.T, cli *apiclient.KopiaAPIClient, sp serverParameters) {
req, err := http.NewRequest("GET", sp.baseURL, nil)
if err != nil {
t.Fatalf("unable to create HTTP request: %v", err)
@@ -336,7 +339,7 @@ func verifyUIServedWithCorrectTitle(t *testing.T, cli *serverapi.Client, sp serv
req.SetBasicAuth("kopia", sp.password)
resp, err := cli.HTTPClient().Do(req)
resp, err := cli.HTTPClient.Do(req)
if err != nil {
t.Fatalf("unable to get HTML root: %v", err)
}
@@ -355,9 +358,9 @@ func verifyUIServedWithCorrectTitle(t *testing.T, cli *serverapi.Client, sp serv
}
}
func waitUntilServerStarted(ctx context.Context, t *testing.T, cli *serverapi.Client) {
func waitUntilServerStarted(ctx context.Context, t *testing.T, cli *apiclient.KopiaAPIClient) {
if err := retry.PeriodicallyNoValue(ctx, 1*time.Second, 60, "wait for server start", func() error {
_, err := cli.Status(testlogging.Context(t))
_, err := serverapi.Status(testlogging.Context(t), cli)
return err
}, retry.Always); err != nil {
t.Fatalf("server failed to start")

View File

@@ -148,7 +148,7 @@ func longLivedRepositoryTest(ctx context.Context, t *testing.T, cancel chan stru
go func() {
defer wg2.Done()
repositoryTest(ctx, t, cancel, rep)
repositoryTest(ctx, t, cancel, rep.(*repo.DirectRepository))
}()
}