chore(repository): BREAKING CHANGE remove support for HTTP-based repository API (#3745)

Remove support for HTTP-based repository API
This commit is contained in:
Jarek Kowalski
2024-04-17 16:23:58 -07:00
committed by GitHub
parent 2d31c7b2d7
commit 211e28c98c
15 changed files with 19 additions and 842 deletions

View File

@@ -15,7 +15,6 @@ type commandRepositoryConnectServer struct {
connectAPIServerURL string
connectAPIServerCertFingerprint string
connectAPIServerUseGRPCAPI bool
svc advancedAppServices
out textOutput
@@ -29,7 +28,6 @@ func (c *commandRepositoryConnectServer) setup(svc advancedAppServices, parent c
cmd := parent.Command("server", "Connect to a repository API Server.")
cmd.Flag("url", "Server URL").Required().StringVar(&c.connectAPIServerURL)
cmd.Flag("server-cert-fingerprint", "Server certificate fingerprint").StringVar(&c.connectAPIServerCertFingerprint)
cmd.Flag("grpc", "Use GRPC API").Default("true").BoolVar(&c.connectAPIServerUseGRPCAPI)
cmd.Action(svc.noRepositoryAction(c.run))
}
@@ -37,7 +35,6 @@ func (c *commandRepositoryConnectServer) run(ctx context.Context) error {
as := &repo.APIServerInfo{
BaseURL: strings.TrimSuffix(c.connectAPIServerURL, "/"),
TrustedServerCertificateFingerprint: strings.ToLower(c.connectAPIServerCertFingerprint),
DisableGRPC: !c.connectAPIServerUseGRPCAPI,
}
configFile := c.svc.repositoryConfigFileName()

View File

@@ -31,10 +31,9 @@ type commandServerStart struct {
serverStartHTMLPath string
serverStartUI bool
serverStartLegacyRepositoryAPI bool
serverStartGRPC bool
serverStartControlAPI bool
serverStartUI bool
serverStartGRPC bool
serverStartControlAPI bool
serverStartRefreshInterval time.Duration
serverStartInsecure bool
@@ -82,7 +81,6 @@ func (c *commandServerStart) setup(svc advancedAppServices, parent commandParent
cmd.Flag("html", "Server the provided HTML at the root URL").ExistingDirVar(&c.serverStartHTMLPath)
cmd.Flag("ui", "Start the server with HTML UI").Default("true").BoolVar(&c.serverStartUI)
cmd.Flag("legacy-api", "Start the legacy server API").Default("false").BoolVar(&c.serverStartLegacyRepositoryAPI)
cmd.Flag("grpc", "Start the GRPC server").Default("true").BoolVar(&c.serverStartGRPC)
cmd.Flag("control-api", "Start the control API").Default("true").BoolVar(&c.serverStartControlAPI)
@@ -276,10 +274,6 @@ func shutdownHTTPServer(ctx context.Context, httpServer *http.Server) {
}
func (c *commandServerStart) setupHandlers(srv *server.Server, m *mux.Router) {
if c.serverStartLegacyRepositoryAPI {
srv.SetupRepositoryAPIHandlers(m)
}
if c.serverStartControlAPI {
srv.SetupControlAPIHandlers(m)
}

View File

@@ -1,54 +0,0 @@
// Package remoterepoapi contains requests and responses for remote repository API.
package remoterepoapi
import (
"encoding/json"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/manifest"
)
// Parameters encapsulates all parameters for repository.
// returned by /api/v1/repo/parameters.
type Parameters struct {
HashFunction string `json:"hash"`
HMACSecret []byte `json:"hmacSecret"`
SupportsContentCompression bool `json:"supportsContentCompression"`
format.ObjectFormat
}
// GetHashFunction returns the name of the hash function for remote repository.
func (p *Parameters) GetHashFunction() string { return p.HashFunction }
// GetHmacSecret returns the HMAC secret for the remote repository.
func (p *Parameters) GetHmacSecret() []byte { return p.HMACSecret }
// ManifestWithMetadata represents manifest payload and metadata.
type ManifestWithMetadata struct {
Payload json.RawMessage `json:"payload"`
Metadata *manifest.EntryMetadata `json:"metadata"`
}
// PrefetchContentsRequest represents a request to prefetch contents.
type PrefetchContentsRequest struct {
ContentIDs []content.ID `json:"contents"`
Hint string `json:"hint"`
}
// PrefetchContentsResponse represents a response from request to prefetch contents.
type PrefetchContentsResponse struct {
ContentIDs []content.ID `json:"contents"`
}
// ApplyRetentionPolicyRequest represents a request to apply retention policy to a given source path.
type ApplyRetentionPolicyRequest struct {
SourcePath string `json:"sourcePath"`
ReallyDelete bool `json:"reallyDelete"`
}
// ApplyRetentionPolicyResponse represents a response to a request to apply retention policy.
type ApplyRetentionPolicyResponse struct {
ManifestIDs []manifest.ID `json:"manifests"`
}

View File

@@ -1,118 +0,0 @@
package server
import (
"context"
"encoding/json"
"errors"
"strconv"
"strings"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/remoterepoapi"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/manifest"
)
func handleContentGet(ctx context.Context, rc requestContext) (interface{}, *apiError) {
dr, ok := rc.rep.(repo.DirectRepository)
if !ok {
return nil, notFoundError("content not found")
}
cid, err := content.ParseID(rc.muxVar("contentID"))
if err != nil {
return nil, notFoundError("content not found")
}
data, err := dr.ContentReader().GetContent(ctx, cid)
switch {
case err == nil:
return data, nil
case errors.Is(err, content.ErrContentNotFound):
return nil, notFoundError("content not found")
default:
return nil, internalServerError(err)
}
}
func handleContentInfo(ctx context.Context, rc requestContext) (interface{}, *apiError) {
cid, err := content.ParseID(rc.muxVar("contentID"))
if err != nil {
return nil, notFoundError("content not found")
}
ci, err := rc.rep.ContentInfo(ctx, cid)
switch {
case err == nil:
return ci, nil
case errors.Is(err, content.ErrContentNotFound):
return nil, notFoundError("content not found")
default:
return nil, internalServerError(err)
}
}
func handleContentPut(ctx context.Context, rc requestContext) (interface{}, *apiError) {
dr, ok := rc.rep.(repo.DirectRepositoryWriter)
if !ok {
return nil, repositoryNotWritableError()
}
cid, cerr := content.ParseID(rc.muxVar("contentID"))
if cerr != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed content ID")
}
prefix := cid.Prefix()
if strings.HasPrefix(string(prefix), manifest.ContentPrefix) {
// it's not allowed to create contents prefixed with 'm' since those could be mistaken for manifest contents.
return nil, accessDeniedError()
}
var comp compression.HeaderID
if c := rc.queryParam("compression"); c != "" {
v, err := strconv.ParseInt(c, 16, 32)
if err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed compression ID")
}
comp = compression.HeaderID(v)
if _, ok := compression.ByHeaderID[comp]; !ok {
return nil, requestError(serverapi.ErrorMalformedRequest, "invalid compression ID")
}
}
actualCID, err := dr.ContentManager().WriteContent(ctx, gather.FromSlice(rc.body), prefix, comp)
if err != nil {
return nil, internalServerError(err)
}
if actualCID != cid {
return nil, requestError(serverapi.ErrorMalformedRequest, "mismatched content ID")
}
return &serverapi.Empty{}, nil
}
func handleContentPrefetch(ctx context.Context, rc requestContext) (interface{}, *apiError) {
var req remoterepoapi.PrefetchContentsRequest
if err := json.Unmarshal(rc.body, &req); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request")
}
return &remoterepoapi.PrefetchContentsResponse{
ContentIDs: rc.rep.PrefetchContents(ctx, req.ContentIDs, req.Hint),
}, nil
}

View File

@@ -1,172 +0,0 @@
package server
import (
"context"
"encoding/json"
"strings"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/auth"
"github.com/kopia/kopia/internal/remoterepoapi"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
)
func handleManifestGet(ctx context.Context, rc requestContext) (interface{}, *apiError) {
mid := manifest.ID(rc.muxVar("manifestID"))
var data json.RawMessage
md, err := rc.rep.GetManifest(ctx, mid, &data)
if errors.Is(err, manifest.ErrNotFound) {
return nil, notFoundError("manifest not found")
}
if err != nil {
return nil, internalServerError(err)
}
if !hasManifestAccess(ctx, rc, md.Labels, auth.AccessLevelRead) {
return nil, accessDeniedError()
}
return &remoterepoapi.ManifestWithMetadata{
Payload: data,
Metadata: md,
}, nil
}
func handleManifestDelete(ctx context.Context, rc requestContext) (interface{}, *apiError) {
rw, ok := rc.rep.(repo.RepositoryWriter)
if !ok {
return nil, repositoryNotWritableError()
}
mid := manifest.ID(rc.muxVar("manifestID"))
var data json.RawMessage
em, err := rc.rep.GetManifest(ctx, mid, &data)
if errors.Is(err, manifest.ErrNotFound) {
return nil, notFoundError("manifest not found")
}
if err != nil {
return nil, internalServerError(err)
}
if !hasManifestAccess(ctx, rc, em.Labels, auth.AccessLevelFull) {
return nil, accessDeniedError()
}
err = rw.DeleteManifest(ctx, mid)
if errors.Is(err, manifest.ErrNotFound) {
return nil, notFoundError("manifest not found")
}
if err != nil {
return nil, internalServerError(err)
}
return &serverapi.Empty{}, nil
}
func handleManifestList(ctx context.Context, rc requestContext) (interface{}, *apiError) {
// password already validated by a wrapper, no need to check here.
labels := map[string]string{}
for k, v := range rc.req.URL.Query() {
labels[k] = v[0]
}
m, err := rc.rep.FindManifests(ctx, labels)
if err != nil {
return nil, internalServerError(err)
}
return filterManifests(m, httpAuthorizationInfo(ctx, rc)), nil
}
func filterManifests(manifests []*manifest.EntryMetadata, authz auth.AuthorizationInfo) []*manifest.EntryMetadata {
result := []*manifest.EntryMetadata{}
for _, m := range manifests {
if authz.ManifestAccessLevel(m.Labels) >= auth.AccessLevelRead {
result = append(result, m)
}
}
return result
}
func handleManifestCreate(ctx context.Context, rc requestContext) (interface{}, *apiError) {
rw, ok := rc.rep.(repo.RepositoryWriter)
if !ok {
return nil, repositoryNotWritableError()
}
var req remoterepoapi.ManifestWithMetadata
if err := json.Unmarshal(rc.body, &req); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request")
}
if !hasManifestAccess(ctx, rc, req.Metadata.Labels, auth.AccessLevelAppend) {
return nil, accessDeniedError()
}
id, err := rw.PutManifest(ctx, req.Metadata.Labels, req.Payload)
if err != nil {
return nil, internalServerError(err)
}
return &manifest.EntryMetadata{ID: id}, nil
}
func handleApplyRetentionPolicy(ctx context.Context, rc requestContext) (interface{}, *apiError) {
rw, ok := rc.rep.(repo.RepositoryWriter)
if !ok {
return nil, repositoryNotWritableError()
}
var req remoterepoapi.ApplyRetentionPolicyRequest
if err := json.Unmarshal(rc.body, &req); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request")
}
usernameAtHostname, _, _ := rc.req.BasicAuth()
parts := strings.Split(usernameAtHostname, "@")
if len(parts) != 2 { //nolint:gomnd
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed username")
}
// only allow users to apply retention policy if they have permission to add snapshots
// for a particular path.
if !hasManifestAccess(ctx, rc, map[string]string{
manifest.TypeLabelKey: snapshot.ManifestType,
snapshot.UsernameLabel: parts[0],
snapshot.HostnameLabel: parts[1],
snapshot.PathLabel: req.SourcePath,
}, auth.AccessLevelAppend) {
return nil, accessDeniedError()
}
ids, err := policy.ApplyRetentionPolicy(ctx, rw, snapshot.SourceInfo{
UserName: parts[0],
Host: parts[1],
Path: req.SourcePath,
}, req.ReallyDelete)
if err != nil {
return nil, internalServerError(err)
}
return &remoterepoapi.ApplyRetentionPolicyResponse{
ManifestIDs: ids,
}, nil
}

View File

@@ -10,7 +10,6 @@
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/passwordpersist"
"github.com/kopia/kopia/internal/remoterepoapi"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
@@ -27,24 +26,6 @@
const syncConnectWaitTime = 5 * time.Second
func handleRepoParameters(ctx context.Context, rc requestContext) (interface{}, *apiError) {
dr, ok := rc.rep.(repo.DirectRepository)
if !ok {
return &serverapi.StatusResponse{
Connected: false,
}, nil
}
rp := &remoterepoapi.Parameters{
HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(),
HMACSecret: dr.ContentReader().ContentFormat().GetHmacSecret(),
ObjectFormat: dr.ObjectFormat(),
SupportsContentCompression: dr.ContentReader().SupportsContentCompression(),
}
return rp, nil
}
func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *apiError) {
if rc.rep == nil {
return &serverapi.StatusResponse{

View File

@@ -161,23 +161,6 @@ func (s *Server) SetupHTMLUIAPIHandlers(m *mux.Router) {
m.HandleFunc("/api/v1/tasks/{taskID}/cancel", s.handleUIPossiblyNotConnected(handleTaskCancel)).Methods(http.MethodPost)
}
// SetupRepositoryAPIHandlers registers HTTP repository API handlers.
func (s *Server) SetupRepositoryAPIHandlers(m *mux.Router) {
m.HandleFunc("/api/v1/flush", s.handleRepositoryAPI(anyAuthenticatedUser, handleFlush)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/repo/parameters", s.handleRepositoryAPI(anyAuthenticatedUser, handleRepoParameters)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/contents/{contentID}", s.handleRepositoryAPI(requireContentAccess(auth.AccessLevelRead), handleContentInfo)).Methods(http.MethodGet).Queries("info", "1")
m.HandleFunc("/api/v1/contents/{contentID}", s.handleRepositoryAPI(requireContentAccess(auth.AccessLevelRead), handleContentGet)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/contents/{contentID}", s.handleRepositoryAPI(requireContentAccess(auth.AccessLevelAppend), handleContentPut)).Methods(http.MethodPut)
m.HandleFunc("/api/v1/contents/prefetch", s.handleRepositoryAPI(requireContentAccess(auth.AccessLevelRead), handleContentPrefetch)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/manifests/{manifestID}", s.handleRepositoryAPI(handlerWillCheckAuthorization, handleManifestGet)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/manifests/{manifestID}", s.handleRepositoryAPI(handlerWillCheckAuthorization, handleManifestDelete)).Methods(http.MethodDelete)
m.HandleFunc("/api/v1/manifests", s.handleRepositoryAPI(handlerWillCheckAuthorization, handleManifestCreate)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/manifests", s.handleRepositoryAPI(handlerWillCheckAuthorization, handleManifestList)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/policies/apply-retention", s.handleRepositoryAPI(handlerWillCheckAuthorization, handleApplyRetentionPolicy)).Methods(http.MethodPost)
}
// SetupControlAPIHandlers registers control API handlers.
func (s *Server) SetupControlAPIHandlers(m *mux.Router) {
// server control API, requires authentication as `server-control` and no CSRF token.
@@ -317,18 +300,6 @@ func (s *Server) requireAuth(checkCSRFToken csrfTokenOption, f func(ctx context.
}
}
func httpAuthorizationInfo(ctx context.Context, rc requestContext) auth.AuthorizationInfo {
// authentication already done
userAtHost, _, _ := rc.req.BasicAuth()
authz := rc.srv.getAuthorizer().Authorize(ctx, rc.rep, userAtHost)
if authz == nil {
authz = auth.NoAccess()
}
return authz
}
type isAuthorizedFunc func(ctx context.Context, rc requestContext) bool
func (s *Server) handleServerControlAPI(f apiRequestFunc) http.HandlerFunc {
@@ -347,16 +318,6 @@ func (s *Server) handleServerControlAPIPossiblyNotConnected(f apiRequestFunc) ht
})
}
func (s *Server) handleRepositoryAPI(isAuthorized isAuthorizedFunc, f apiRequestFunc) http.HandlerFunc {
return s.handleRequestPossiblyNotConnected(isAuthorized, csrfTokenNotRequired, func(ctx context.Context, rc requestContext) (interface{}, *apiError) {
if rc.rep == nil {
return nil, requestError(serverapi.ErrorNotConnected, "not connected")
}
return f(ctx, rc)
})
}
func (s *Server) handleUI(f apiRequestFunc) http.HandlerFunc {
return s.handleRequestPossiblyNotConnected(requireUIUser, csrfTokenRequired, func(ctx context.Context, rc requestContext) (interface{}, *apiError) {
if rc.rep == nil {

View File

@@ -10,7 +10,6 @@
"net/http"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/auth"
)
// kopiaSessionCookie is the name of the session cookie that Kopia server will generate for all
@@ -95,16 +94,6 @@ func handlerWillCheckAuthorization(ctx context.Context, _ requestContext) bool {
return true
}
func requireContentAccess(level auth.AccessLevel) isAuthorizedFunc {
return func(ctx context.Context, rc requestContext) bool {
return httpAuthorizationInfo(ctx, rc).ContentAccessLevel() >= level
}
}
func hasManifestAccess(ctx context.Context, rc requestContext, labels map[string]string, level auth.AccessLevel) bool {
return httpAuthorizationInfo(ctx, rc).ManifestAccessLevel(labels) >= level
}
var (
_ isAuthorizedFunc = requireUIUser
_ isAuthorizedFunc = anyAuthenticatedUser

View File

@@ -27,21 +27,10 @@
maxCacheSizeBytes = 1e6
)
func TestServer_REST(t *testing.T) {
testServer(t, true)
}
func TestServer_GRPC(t *testing.T) {
testServer(t, false)
}
//nolint:thelper
func testServer(t *testing.T, disableGRPC bool) {
func TestServer(t *testing.T) {
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
apiServerInfo := servertesting.StartServer(t, env, true)
apiServerInfo.DisableGRPC = disableGRPC
ctx2, cancel := context.WithCancel(ctx)
rep, err := servertesting.ConnectAndOpenAPIServer(t, ctx2, apiServerInfo, repo.ClientOptions{

View File

@@ -64,7 +64,6 @@ func StartServer(t *testing.T, env *repotesting.Environment, tls bool) *repo.API
m := mux.NewRouter()
s.SetupHTMLUIAPIHandlers(m)
s.SetupRepositoryAPIHandlers(m)
s.SetupControlAPIHandlers(m)
s.ServeStaticFiles(m, server.AssetFile())

View File

@@ -2,352 +2,14 @@
import (
"context"
"encoding/json"
"fmt"
"net/url"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/remoterepoapi"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/repo/object"
)
// APIServerInfo is remote repository configuration stored in local configuration.
type APIServerInfo struct {
BaseURL string `json:"url"`
TrustedServerCertificateFingerprint string `json:"serverCertFingerprint"`
DisableGRPC bool `json:"disableGRPC,omitempty"`
}
// remoteRepository is an implementation of Repository that connects to an instance of
// API server hosted by `kopia server`, instead of directly manipulating files in the BLOB storage.
type apiServerRepository struct {
cli *apiclient.KopiaAPIClient
serverSupportsContentCompression bool
omgr *object.Manager
wso WriteSessionOptions
afterFlush []RepositoryWriterCallback
*immutableServerRepositoryParameters // immutable parameters
}
func (r *apiServerRepository) APIServerURL() string {
return r.cli.BaseURL
}
func (r *apiServerRepository) Description() string {
if r.cliOpts.Description != "" {
return r.cliOpts.Description
}
return fmt.Sprintf("Repository Server: %v", r.cli.BaseURL)
}
func (r *apiServerRepository) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) {
//nolint:wrapcheck
return object.Open(ctx, r, id)
}
func (r *apiServerRepository) NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer {
return r.omgr.NewWriter(ctx, opt)
}
// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *apiServerRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
}
func (r *apiServerRepository) VerifyObject(ctx context.Context, id object.ID) ([]content.ID, error) {
//nolint:wrapcheck
return object.VerifyObject(ctx, r, id)
}
func (r *apiServerRepository) GetManifest(ctx context.Context, id manifest.ID, data interface{}) (*manifest.EntryMetadata, error) {
var mm remoterepoapi.ManifestWithMetadata
if err := r.cli.Get(ctx, "manifests/"+string(id), manifest.ErrNotFound, &mm); err != nil {
return nil, errors.Wrap(err, "GetManifest")
}
//nolint:wrapcheck
return mm.Metadata, json.Unmarshal(mm.Payload, data)
}
func (r *apiServerRepository) PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) {
v, err := json.Marshal(payload)
if err != nil {
return "", errors.Wrap(err, "unable to marshal JSON")
}
req := &remoterepoapi.ManifestWithMetadata{
Payload: json.RawMessage(v),
Metadata: &manifest.EntryMetadata{
Labels: labels,
},
}
resp := &manifest.EntryMetadata{}
if err := r.cli.Post(ctx, "manifests", req, resp); err != nil {
return "", errors.Wrap(err, "PutManifest")
}
return resp.ID, nil
}
// ReplaceManifests saves the given manifest payload with a set of labels and replaces any previous manifests with the same labels.
func (r *apiServerRepository) ReplaceManifests(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) {
return replaceManifestsHelper(ctx, r, labels, payload)
}
func (r *apiServerRepository) SetFindManifestPageSizeForTesting(v int32) {
_ = v
}
func (r *apiServerRepository) FindManifests(ctx context.Context, labels map[string]string) ([]*manifest.EntryMetadata, error) {
uv := make(url.Values)
for k, v := range labels {
uv.Add(k, v)
}
var mm []*manifest.EntryMetadata
if err := r.cli.Get(ctx, "manifests?"+uv.Encode(), nil, &mm); err != nil {
return nil, errors.Wrap(err, "FindManifests")
}
return mm, nil
}
func (r *apiServerRepository) DeleteManifest(ctx context.Context, id manifest.ID) error {
return errors.Wrap(r.cli.Delete(ctx, "manifests/"+string(id), manifest.ErrNotFound, nil, nil), "DeleteManifest")
}
func (r *apiServerRepository) Time() time.Time {
return clock.Now()
}
func (r *apiServerRepository) Refresh(ctx context.Context) error {
return nil
}
func (r *apiServerRepository) Flush(ctx context.Context) error {
if err := invokeCallbacks(ctx, r, r.beforeFlush); err != nil {
return errors.Wrap(err, "before flush")
}
if err := r.cli.Post(ctx, "flush", nil, nil); err != nil {
return errors.Wrap(err, "Flush")
}
if err := invokeCallbacks(ctx, r, r.afterFlush); err != nil {
return errors.Wrap(err, "after flush")
}
return nil
}
func (r *apiServerRepository) SupportsContentCompression() bool {
return r.serverSupportsContentCompression
}
func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOptions) (context.Context, RepositoryWriter, error) {
// apiServerRepository is stateless except object manager.
r2 := *r
w := &r2
// create object manager using a remote repo as contentManager implementation.
omgr, err := object.NewObjectManager(ctx, w, r.objectFormat, r.metricsRegistry)
if err != nil {
return nil, nil, errors.Wrap(err, "error initializing object manager")
}
w.omgr = omgr
w.wso = opt
w.afterFlush = nil
if w.wso.OnUpload == nil {
w.wso.OnUpload = func(_ int64) {}
}
r.addRef()
return ctx, w, nil
}
func (r *apiServerRepository) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) {
var bi content.Info
//nolint:goconst
if err := r.cli.Get(ctx, "contents/"+contentID.String()+"?info=1", content.ErrContentNotFound, &bi); err != nil {
return content.Info{}, errors.Wrap(err, "ContentInfo")
}
return bi, nil
}
func (r *apiServerRepository) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) {
var tmp gather.WriteBuffer
defer tmp.Close()
err := r.contentCache.GetOrLoad(ctx, contentID.String(), func(_ *gather.WriteBuffer) error {
var result []byte
if err := r.cli.Get(ctx, "contents/"+contentID.String(), content.ErrContentNotFound, &result); err != nil {
return errors.Wrap(err, "GetContent")
}
tmp.Write(result) //nolint:errcheck
return nil
}, &tmp)
if err != nil {
//nolint:wrapcheck
return nil, err
}
return tmp.ToByteSlice(), nil
}
func (r *apiServerRepository) WriteContent(ctx context.Context, data gather.Bytes, prefix content.IDPrefix, comp compression.HeaderID) (content.ID, error) {
if err := prefix.ValidateSingle(); err != nil {
return content.EmptyID, errors.Wrap(err, "invalid prefix")
}
var hashOutput [128]byte
contentID, err := content.IDFromHash(prefix, r.h(hashOutput[:0], data))
if err != nil {
return content.EmptyID, errors.Wrap(err, "invalid content ID")
}
// if content is large enough, perform existence check on the server,
// for small contents we skip the check, since the server-side existence
// check is fast and we avoid double round trip.
if data.Length() >= writeContentCheckExistenceAboveSize {
if _, err := r.ContentInfo(ctx, contentID); err == nil {
// content already exists
return contentID, nil
}
}
r.wso.OnUpload(int64(data.Length()))
maybeCompression := ""
if comp != content.NoCompression {
maybeCompression = fmt.Sprintf("?compression=%x", comp)
}
if err := r.cli.Put(ctx, "contents/"+contentID.String()+maybeCompression, data.ToByteSlice(), nil); err != nil {
return content.EmptyID, errors.Wrapf(err, "error writing content %v", contentID)
}
if prefix != "" {
// add all prefixed contents to the cache.
r.contentCache.Put(ctx, contentID.String(), data)
}
return contentID, nil
}
// UpdateDescription updates the description of a connected repository.
func (r *apiServerRepository) UpdateDescription(d string) {
r.cliOpts.Description = d
}
func (r *apiServerRepository) PrefetchObjects(ctx context.Context, objectIDs []object.ID, hint string) ([]content.ID, error) {
//nolint:wrapcheck
return object.PrefetchBackingContents(ctx, r, objectIDs, hint)
}
func (r *apiServerRepository) PrefetchContents(ctx context.Context, contentIDs []content.ID, hint string) []content.ID {
resp := &remoterepoapi.PrefetchContentsResponse{}
if err := r.cli.Post(ctx, "contents/prefetch", remoterepoapi.PrefetchContentsRequest{
ContentIDs: contentIDs,
Hint: hint,
}, resp); err != nil {
log(ctx).Warnf("unable to prefetch contents: %v", err)
return nil
}
return resp.ContentIDs
}
func (r *apiServerRepository) ApplyRetentionPolicy(ctx context.Context, sourcePath string, reallyDelete bool) ([]manifest.ID, error) {
var result remoterepoapi.ApplyRetentionPolicyResponse
if err := r.cli.Post(ctx, "policies/apply-retention", remoterepoapi.ApplyRetentionPolicyRequest{
SourcePath: sourcePath,
ReallyDelete: reallyDelete,
}, &result); err != nil {
return nil, errors.Wrap(err, "unable to apply retention policy")
}
return result.ManifestIDs, nil
}
// OnSuccessfulFlush registers the provided callback to be invoked after flush succeeds.
func (r *apiServerRepository) OnSuccessfulFlush(callback RepositoryWriterCallback) {
r.afterFlush = append(r.afterFlush, callback)
}
var _ Repository = (*apiServerRepository)(nil)
// openRestAPIRepository connects remote repository over Kopia API.
func openRestAPIRepository(ctx context.Context, si *APIServerInfo, password string, par *immutableServerRepositoryParameters) (Repository, error) {
cli, err := apiclient.NewKopiaAPIClient(apiclient.Options{
BaseURL: si.BaseURL,
TrustedServerCertificateFingerprint: si.TrustedServerCertificateFingerprint,
Username: par.cliOpts.UsernameAtHost(),
Password: password,
LogRequests: true,
})
if err != nil {
return nil, errors.Wrap(err, "unable to create API client")
}
rr := &apiServerRepository{
immutableServerRepositoryParameters: par,
cli: cli,
wso: WriteSessionOptions{
OnUpload: func(_ int64) {},
},
}
var p remoterepoapi.Parameters
if err = cli.Get(ctx, "repo/parameters", nil, &p); err != nil {
return nil, errors.Wrap(err, "unable to get repository parameters")
}
hf, err := hashing.CreateHashFunc(&p)
if err != nil {
return nil, errors.Wrap(err, "unable to create hash function")
}
rr.h = hf
rr.objectFormat = p.ObjectFormat
rr.serverSupportsContentCompression = p.SupportsContentCompression
// create object manager using rr as contentManager implementation.
omgr, err := object.NewObjectManager(ctx, rr, rr.objectFormat, par.metricsRegistry)
if err != nil {
return nil, errors.Wrap(err, "error initializing object manager")
}
rr.omgr = omgr
return rr, nil
}
// ConnectAPIServer sets up repository connection to a particular API server.

View File

@@ -195,10 +195,6 @@ func(ctx context.Context) error {
beforeFlush: options.BeforeFlush,
}
if si.DisableGRPC {
return openRestAPIRepository(ctx, si, password, par)
}
return openGRPCAPIRepository(ctx, si, password, par)
}

View File

@@ -640,20 +640,10 @@ func TestWriteSessionFlushOnSuccess(t *testing.T) {
require.EqualValues(t, 2, afterFlushCount.Load())
}
func TestWriteSessionFlushOnSuccessClient_REST(t *testing.T) {
testWriteSessionFlushOnSuccessClient(t, true)
}
func TestWriteSessionFlushOnSuccessClient_GRPC(t *testing.T) {
testWriteSessionFlushOnSuccessClient(t, false)
}
//nolint:thelper
func testWriteSessionFlushOnSuccessClient(t *testing.T, disableGRPC bool) {
func TestWriteSessionFlushOnSuccessClient(t *testing.T) {
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant, repotesting.Options{})
apiServerInfo := servertesting.StartServer(t, env, true)
apiServerInfo.DisableGRPC = disableGRPC
var beforeFlushCount, afterFlushCount atomic.Int32

View File

@@ -12,23 +12,9 @@
"github.com/kopia/kopia/tests/testenv"
)
func TestACL_GRPC(t *testing.T) {
verifyACL(t, false)
}
func TestACL_HTTP(t *testing.T) {
verifyACL(t, true)
}
//nolint:thelper
func verifyACL(t *testing.T, disableGRPC bool) {
func TestACL(t *testing.T) {
t.Parallel()
grpcArgument := "--grpc"
if disableGRPC {
grpcArgument = "--no-grpc"
}
serverRunner := testenv.NewInProcRunner(t)
serverEnvironment := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, serverRunner)
@@ -77,7 +63,6 @@ func verifyACL(t *testing.T, disableGRPC bool) {
"--server-control-password=admin-pwd",
"--tls-generate-cert",
"--tls-generate-rsa-key-size=2048", // use shorter key size to speed up generation
"--legacy-api",
)
t.Logf("detected server parameters %#v", sp)
@@ -99,7 +84,6 @@ func verifyACL(t *testing.T, disableGRPC bool) {
"--override-username", "foo",
"--override-hostname", "bar",
"--password", "baz",
grpcArgument,
)
anotherBarRunner := testenv.NewInProcRunner(t)
@@ -116,7 +100,6 @@ func verifyACL(t *testing.T, disableGRPC bool) {
"--override-username", "another",
"--override-hostname", "bar",
"--password", "baz",
grpcArgument,
)
aliceInWonderlandRunner := testenv.NewInProcRunner(t)
@@ -133,7 +116,6 @@ func verifyACL(t *testing.T, disableGRPC bool) {
"--override-username", "alice",
"--override-hostname", "wonderland",
"--password", "baz",
grpcArgument,
)
// both alice and foo@bar can see global policy

View File

@@ -38,34 +38,22 @@
controlPassword = "control-password"
)
func TestAPIServerRepository_GRPC_htpasswd(t *testing.T) {
func TestAPIServerRepository_htpasswd(t *testing.T) {
t.Parallel()
testAPIServerRepository(t, []string{"--no-legacy-api"}, true, false)
testAPIServerRepository(t, false)
}
func TestAPIServerRepository_GRPC_RepositoryUsers(t *testing.T) {
func TestAPIServerRepository_RepositoryUsers(t *testing.T) {
t.Parallel()
testAPIServerRepository(t, []string{"--no-legacy-api"}, true, true)
}
func TestAPIServerRepository_DisableGRPC_htpasswd(t *testing.T) {
t.Parallel()
testAPIServerRepository(t, []string{"--no-grpc", "--legacy-api"}, false, false)
testAPIServerRepository(t, true)
}
//nolint:thelper
func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, allowRepositoryUsers bool) {
func testAPIServerRepository(t *testing.T, allowRepositoryUsers bool) {
ctx := testlogging.Context(t)
var connectArgs []string
if !useGRPC {
connectArgs = []string{"--no-grpc"}
}
runner := testenv.NewInProcRunner(t)
e := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, runner)
@@ -98,6 +86,8 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
tlsCert := filepath.Join(e.ConfigDir, "tls.cert")
tlsKey := filepath.Join(e.ConfigDir, "tls.key")
var serverStartArgs []string
if allowRepositoryUsers {
e.RunAndExpectSuccess(t, "server", "users", "add", "foo@bar", "--user-password", "baz")
} else {
@@ -146,7 +136,6 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
rep, err := servertesting.ConnectAndOpenAPIServer(t, ctx2, &repo.APIServerInfo{
BaseURL: sp.BaseURL,
TrustedServerCertificateFingerprint: sp.SHA256Fingerprint,
DisableGRPC: !useGRPC,
}, repo.ClientOptions{
Username: "foo",
Hostname: "bar",
@@ -205,15 +194,9 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
verifyFindManifestCount(ctx, t, rep, pageSize, someLabels, 5)
}
if useGRPC {
// the same method on a GRPC write session should fail because the stream was broken.
_, err := writeSess.FindManifests(ctx, someLabels)
require.Error(t, err)
} else {
// invoke some method on write session, this will succeed because legacy API is stateless
// (also incorrect in this case).
verifyFindManifestCount(ctx, t, writeSess, 1, someLabels, 5)
}
// the same method on a GRPC write session should fail because the stream was broken.
_, err = writeSess.FindManifests(ctx, someLabels)
require.Error(t, err)
runner2 := testenv.NewInProcRunner(t)
e2 := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, runner2)
@@ -221,14 +204,14 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
defer e2.RunAndExpectSuccess(t, "repo", "disconnect")
e2.RunAndExpectSuccess(t, append([]string{
e2.RunAndExpectSuccess(t,
"repo", "connect", "server",
"--url", sp.BaseURL + "/",
"--url", sp.BaseURL+"/",
"--server-cert-fingerprint", sp.SHA256Fingerprint,
"--override-username", "foo",
"--override-hostname", "bar",
"--password", "baz",
}, connectArgs...)...)
)
// we are providing custom password to connect, make sure we won't be providing
// (different) default password via environment variable, as command-line password
@@ -275,7 +258,6 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
servertesting.ConnectAndOpenAPIServer(t, ctx, &repo.APIServerInfo{
BaseURL: sp.BaseURL,
TrustedServerCertificateFingerprint: sp.SHA256Fingerprint,
DisableGRPC: !useGRPC,
}, repo.ClientOptions{
Username: "foo",
Hostname: "bar",
@@ -321,7 +303,6 @@ func TestFindManifestsPaginationOverGRPC(t *testing.T) {
"server", "start",
"--address=localhost:0",
"--grpc",
"--no-legacy-api",
"--tls-key-file", tlsKey,
"--tls-cert-file", tlsCert,
"--tls-generate-cert",