refactor(repository): refactored server repository implementations (#2595)

This extracts common bits shared by HTTP and GRPC implementations into
one.
This commit is contained in:
Jarek Kowalski
2022-11-21 20:29:05 -08:00
committed by GitHub
parent f50779241e
commit 82b3f1c648
7 changed files with 105 additions and 119 deletions

View File

@@ -133,14 +133,14 @@ func testServer(t *testing.T, disableGRPC bool) {
remoteRepositoryTest(ctx, t, rep)
}
func TestGPRServer_AuthenticationError(t *testing.T) {
func TestGRPCServer_AuthenticationError(t *testing.T) {
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
apiServerInfo := startServer(t, env, true)
if _, err := repo.OpenGRPCAPIRepository(ctx, apiServerInfo, repo.ClientOptions{
if _, err := repo.OpenAPIServer(ctx, apiServerInfo, repo.ClientOptions{
Username: "bad-username",
Hostname: "bad-hostname",
}, nil, "bad-password", nil); err == nil {
}, nil, "bad-password"); err == nil {
t.Fatal("unexpected success when connecting with invalid username")
}
}

View File

@@ -10,14 +10,11 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/internal/remoterepoapi"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/repo/object"
@@ -34,17 +31,11 @@ type APIServerInfo struct {
// API server hosted by `kopia server`, instead of directly manipulating files in the BLOB storage.
type apiServerRepository struct {
cli *apiclient.KopiaAPIClient
h hashing.HashFunc
objectFormat format.ObjectFormat
serverSupportsContentCompression bool
cliOpts ClientOptions
omgr *object.Manager
wso WriteSessionOptions
metricsRegistry *metrics.Registry
isSharedReadOnlySession bool
contentCache *cache.PersistentCache
*refCountedCloser
immutableServerRepositoryParameters // immutable parameters
}
func (r *apiServerRepository) APIServerURL() string {
@@ -59,10 +50,6 @@ func (r *apiServerRepository) Description() string {
return fmt.Sprintf("Repository Server: %v", r.cli.BaseURL)
}
func (r *apiServerRepository) ClientOptions() ClientOptions {
return r.cliOpts
}
func (r *apiServerRepository) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) {
//nolint:wrapcheck
return object.Open(ctx, r, id)
@@ -166,7 +153,7 @@ func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOpt
w.omgr = omgr
w.wso = opt
r.refCountedCloser.addRef()
r.addRef()
return ctx, w, nil
}
@@ -268,19 +255,14 @@ func (r *apiServerRepository) PrefetchContents(ctx context.Context, contentIDs [
return resp.ContentIDs
}
// Metrics provides access to the metrics registry.
func (r *apiServerRepository) Metrics() *metrics.Registry {
return r.metricsRegistry
}
var _ Repository = (*apiServerRepository)(nil)
// openRestAPIRepository connects remote repository over Kopia API.
func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, contentCache *cache.PersistentCache, password string, mr *metrics.Registry) (Repository, error) {
func openRestAPIRepository(ctx context.Context, si *APIServerInfo, password string, par immutableServerRepositoryParameters) (Repository, error) {
cli, err := apiclient.NewKopiaAPIClient(apiclient.Options{
BaseURL: si.BaseURL,
TrustedServerCertificateFingerprint: si.TrustedServerCertificateFingerprint,
Username: cliOpts.UsernameAtHost(),
Username: par.cliOpts.UsernameAtHost(),
Password: password,
LogRequests: true,
})
@@ -288,27 +270,12 @@ func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien
return nil, errors.Wrap(err, "unable to create API client")
}
rcc := newRefCountedCloser(
func(ctx context.Context) error {
if contentCache != nil {
contentCache.Close(ctx)
}
return nil
},
mr.Close,
)
rr := &apiServerRepository{
cli: cli,
cliOpts: cliOpts,
contentCache: contentCache,
immutableServerRepositoryParameters: par,
cli: cli,
wso: WriteSessionOptions{
OnUpload: func(i int64) {},
},
isSharedReadOnlySession: true,
metricsRegistry: mr,
refCountedCloser: rcc,
}
var p remoterepoapi.Parameters
@@ -327,7 +294,7 @@ func(ctx context.Context) error {
rr.serverSupportsContentCompression = p.SupportsContentCompression
// create object manager using rr as contentManager implementation.
omgr, err := object.NewObjectManager(ctx, rr, rr.objectFormat, mr)
omgr, err := object.NewObjectManager(ctx, rr, rr.objectFormat, par.metricsRegistry)
if err != nil {
return nil, errors.Wrap(err, "error initializing object manager")
}

View File

@@ -16,12 +16,10 @@
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/ctxutil"
"github.com/kopia/kopia/internal/gather"
apipb "github.com/kopia/kopia/internal/grpcapi"
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/internal/retry"
"github.com/kopia/kopia/internal/tlsutil"
"github.com/kopia/kopia/repo/blob"
@@ -58,7 +56,6 @@ func errNoSessionResponse() error {
// grpcRepositoryClient is an implementation of Repository that connects to an instance of
// GPRC API server hosted by `kopia server`.
type grpcRepositoryClient struct {
*refCountedCloser
conn *grpc.ClientConn
innerSessionMutex sync.Mutex
@@ -76,17 +73,12 @@ type grpcRepositoryClient struct {
asyncWritesWG *errgroup.Group
h hashing.HashFunc
objectFormat format.ObjectFormat
immutableServerRepositoryParameters
serverSupportsContentCompression bool
cliOpts ClientOptions
omgr *object.Manager
contentCache *cache.PersistentCache
recent recentlyRead
metricsRegistry *metrics.Registry
}
type grpcInnerSession struct {
@@ -209,10 +201,6 @@ func (r *grpcRepositoryClient) LegacyWriter() RepositoryWriter {
return nil
}
func (r *grpcRepositoryClient) ClientOptions() ClientOptions {
return r.cliOpts
}
func (r *grpcRepositoryClient) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) {
//nolint:wrapcheck
return object.Open(ctx, r, id)
@@ -460,7 +448,7 @@ func (r *grpcInnerSession) Flush(ctx context.Context) error {
}
func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOptions) (context.Context, RepositoryWriter, error) {
w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.refCountedCloser, r.cliOpts, opt, r.contentCache, false, r.metricsRegistry)
w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, opt, false, r.immutableServerRepositoryParameters)
if err != nil {
return nil, nil, err
}
@@ -753,9 +741,9 @@ func (c grpcCreds) RequireTransportSecurity() bool {
return true
}
// OpenGRPCAPIRepository opens the Repository based on remote GRPC server.
// openGRPCAPIRepository opens the Repository based on remote GRPC server.
// The APIServerInfo must have the address of the repository as 'https://host:port'
func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, contentCache *cache.PersistentCache, password string, mr *metrics.Registry) (Repository, error) {
func openGRPCAPIRepository(ctx context.Context, si *APIServerInfo, password string, par immutableServerRepositoryParameters) (Repository, error) {
var transportCreds credentials.TransportCredentials
if si.TrustedServerCertificateFingerprint != "" {
@@ -775,7 +763,7 @@ func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien
conn, err := grpc.Dial(
u.Hostname()+":"+u.Port(),
grpc.WithPerRPCCredentials(grpcCreds{cliOpts.Hostname, cliOpts.Username, password}),
grpc.WithPerRPCCredentials(grpcCreds{par.cliOpts.Hostname, par.cliOpts.Username, password}),
grpc.WithTransportCredentials(transportCreds),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize),
@@ -786,18 +774,12 @@ func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien
return nil, errors.Wrap(err, "dial error")
}
rcc := newRefCountedCloser(
par.refCountedCloser.registerEarlyCloseFunc(
func(ctx context.Context) error {
return errors.Wrap(conn.Close(), "error closing GRPC connection")
},
func(ctx context.Context) error {
contentCache.Close(ctx)
return nil
},
mr.Close,
)
})
rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, rcc, cliOpts, WriteSessionOptions{}, contentCache, true, mr)
rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, WriteSessionOptions{}, true, par)
if err != nil {
return nil, err
}
@@ -864,36 +846,25 @@ func (r *grpcRepositoryClient) killInnerSession() {
}
}
// Metrics provides access to the metrics registry.
func (r *grpcRepositoryClient) Metrics() *metrics.Registry {
return r.metricsRegistry
}
// newGRPCAPIRepositoryForConnection opens GRPC-based repository connection.
func newGRPCAPIRepositoryForConnection(
ctx context.Context,
conn *grpc.ClientConn,
sc *refCountedCloser,
cliOpts ClientOptions,
opt WriteSessionOptions,
contentCache *cache.PersistentCache,
transparentRetries bool,
mr *metrics.Registry,
par immutableServerRepositoryParameters,
) (*grpcRepositoryClient, error) {
if opt.OnUpload == nil {
opt.OnUpload = func(i int64) {}
}
rr := &grpcRepositoryClient{
refCountedCloser: sc,
conn: conn,
cliOpts: cliOpts,
transparentRetries: transparentRetries,
opt: opt,
isReadOnly: cliOpts.ReadOnly,
contentCache: contentCache,
asyncWritesWG: new(errgroup.Group),
metricsRegistry: mr,
immutableServerRepositoryParameters: par,
conn: conn,
transparentRetries: transparentRetries,
opt: opt,
isReadOnly: par.cliOpts.ReadOnly,
asyncWritesWG: new(errgroup.Group),
}
return inSessionWithoutRetry(ctx, rr, func(ctx context.Context, sess *grpcInnerSession) (*grpcRepositoryClient, error) {

View File

@@ -161,18 +161,36 @@ func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, pass
// OpenAPIServer connects remote repository over Kopia API.
func OpenAPIServer(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, cachingOptions *content.CachingOptions, password string) (Repository, error) {
me := metrics.NewRegistry()
mr := metrics.NewRegistry()
contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password, me)
contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password, mr)
if err != nil {
return nil, errors.Wrap(err, "error opening content cache")
}
if si.DisableGRPC {
return openRestAPIRepository(ctx, si, cliOpts, contentCache, password, me)
closer := newRefCountedCloser(
func(ctx context.Context) error {
if contentCache != nil {
contentCache.Close(ctx)
}
return nil
},
mr.Close,
)
par := immutableServerRepositoryParameters{
cliOpts: cliOpts,
contentCache: contentCache,
metricsRegistry: mr,
refCountedCloser: closer,
}
return OpenGRPCAPIRepository(ctx, si, cliOpts, contentCache, password, me)
if si.DisableGRPC {
return openRestAPIRepository(ctx, si, password, par)
}
return openGRPCAPIRepository(ctx, si, password, par)
}
// openDirect opens the repository that directly manipulates blob storage..
@@ -215,8 +233,8 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
DisableInternalLog: options.DisableInternalLog,
}
me := metrics.NewRegistry()
st = storagemetrics.NewWrapper(st, me)
mr := metrics.NewRegistry()
st = storagemetrics.NewWrapper(st, mr)
fmgr, ferr := format.NewManager(ctx, st, cacheOpts.CacheDirectory, cliOpts.FormatBlobCacheDuration, password, cmOpts.TimeNow)
if ferr != nil {
@@ -286,7 +304,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
// background/interleaving upgrade lock storage monitor
st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures)
scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, me)
scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, mr)
if ferr != nil {
return nil, errors.Wrap(ferr, "unable to create shared content manager")
}
@@ -296,19 +314,19 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
SessionHost: cliOpts.Hostname,
}, "")
om, ferr := object.NewObjectManager(ctx, cm, fmgr.ObjectFormat(), me)
om, ferr := object.NewObjectManager(ctx, cm, fmgr.ObjectFormat(), mr)
if ferr != nil {
return nil, errors.Wrap(ferr, "unable to open object manager")
}
manifests, ferr := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow}, me)
manifests, ferr := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow}, mr)
if ferr != nil {
return nil, errors.Wrap(ferr, "unable to open manifests")
}
rcc := newRefCountedCloser(
closer := newRefCountedCloser(
scm.CloseShared,
me.Close,
mr.Close,
)
dr := &directRepository{
@@ -317,17 +335,16 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
blobs: st,
mmgr: manifests,
sm: scm,
directRepositoryParameters: directRepositoryParameters{
cachingOptions: *cacheOpts,
fmgr: fmgr,
timeNow: cmOpts.TimeNow,
cliOpts: cliOpts,
configFile: configFile,
nextWriterID: new(int32),
throttler: throttler,
metricsRegistry: me,
refCountedCloser: rcc,
immutableDirectRepositoryParameters: immutableDirectRepositoryParameters{
cachingOptions: *cacheOpts,
fmgr: fmgr,
timeNow: cmOpts.TimeNow,
cliOpts: cliOpts,
configFile: configFile,
nextWriterID: new(int32),
throttler: throttler,
metricsRegistry: mr,
refCountedCloser: closer,
},
}

View File

@@ -49,6 +49,10 @@ func (c *refCountedCloser) addRef() {
c.refCount.Add(1)
}
func (c *refCountedCloser) registerEarlyCloseFunc(f closeFunc) {
c.closers = append(c.closers, append([]closeFunc(nil), f)...)
}
func newRefCountedCloser(f ...closeFunc) *refCountedCloser {
rcc := &refCountedCloser{
closers: f,

View File

@@ -88,7 +88,7 @@ type DirectRepositoryWriter interface {
// RollbackUpgrade(ctx context.Context) error
}
type directRepositoryParameters struct {
type immutableDirectRepositoryParameters struct {
configFile string
cachingOptions content.CachingOptions
cliOpts ClientOptions
@@ -103,7 +103,7 @@ type directRepositoryParameters struct {
// directRepository is an implementation of repository that directly manipulates underlying storage.
type directRepository struct {
directRepositoryParameters
immutableDirectRepositoryParameters
blobs blob.Storage
cmgr *content.WriteManager
@@ -262,12 +262,12 @@ func (r *directRepository) NewDirectWriter(ctx context.Context, opt WriteSession
}
w := &directRepository{
directRepositoryParameters: r.directRepositoryParameters,
blobs: r.blobs,
cmgr: cmgr,
omgr: omgr,
mmgr: mmgr,
sm: r.sm,
immutableDirectRepositoryParameters: r.immutableDirectRepositoryParameters,
blobs: r.blobs,
cmgr: cmgr,
omgr: omgr,
mmgr: mmgr,
sm: r.sm,
}
w.addRef()

View File

@@ -0,0 +1,27 @@
package repo
import (
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/hashing"
)
// immutableServerRepositoryParameters contains immutable parameters shared between HTTP and GRPC clients.
type immutableServerRepositoryParameters struct {
h hashing.HashFunc
objectFormat format.ObjectFormat
cliOpts ClientOptions
metricsRegistry *metrics.Registry
contentCache *cache.PersistentCache
*refCountedCloser
}
// Metrics provides access to the metrics registry.
func (r *immutableServerRepositoryParameters) Metrics() *metrics.Registry {
return r.metricsRegistry
}
func (r *immutableServerRepositoryParameters) ClientOptions() ClientOptions {
return r.cliOpts
}