diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 753ad5ca7..f85d8ea0d 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -133,14 +133,14 @@ func testServer(t *testing.T, disableGRPC bool) { remoteRepositoryTest(ctx, t, rep) } -func TestGPRServer_AuthenticationError(t *testing.T) { +func TestGRPCServer_AuthenticationError(t *testing.T) { ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) apiServerInfo := startServer(t, env, true) - if _, err := repo.OpenGRPCAPIRepository(ctx, apiServerInfo, repo.ClientOptions{ + if _, err := repo.OpenAPIServer(ctx, apiServerInfo, repo.ClientOptions{ Username: "bad-username", Hostname: "bad-hostname", - }, nil, "bad-password", nil); err == nil { + }, nil, "bad-password"); err == nil { t.Fatal("unexpected success when connecting with invalid username") } } diff --git a/repo/api_server_repository.go b/repo/api_server_repository.go index d6c43ea8d..7f4c68d34 100644 --- a/repo/api_server_repository.go +++ b/repo/api_server_repository.go @@ -10,14 +10,11 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/apiclient" - "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/gather" - "github.com/kopia/kopia/internal/metrics" "github.com/kopia/kopia/internal/remoterepoapi" "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/content" - "github.com/kopia/kopia/repo/format" "github.com/kopia/kopia/repo/hashing" "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/repo/object" @@ -34,17 +31,11 @@ type APIServerInfo struct { // API server hosted by `kopia server`, instead of directly manipulating files in the BLOB storage. type apiServerRepository struct { cli *apiclient.KopiaAPIClient - h hashing.HashFunc - objectFormat format.ObjectFormat serverSupportsContentCompression bool - cliOpts ClientOptions omgr *object.Manager wso WriteSessionOptions - metricsRegistry *metrics.Registry - isSharedReadOnlySession bool - contentCache *cache.PersistentCache - *refCountedCloser + immutableServerRepositoryParameters // immutable parameters } func (r *apiServerRepository) APIServerURL() string { @@ -59,10 +50,6 @@ func (r *apiServerRepository) Description() string { return fmt.Sprintf("Repository Server: %v", r.cli.BaseURL) } -func (r *apiServerRepository) ClientOptions() ClientOptions { - return r.cliOpts -} - func (r *apiServerRepository) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) { //nolint:wrapcheck return object.Open(ctx, r, id) @@ -166,7 +153,7 @@ func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOpt w.omgr = omgr w.wso = opt - r.refCountedCloser.addRef() + r.addRef() return ctx, w, nil } @@ -268,19 +255,14 @@ func (r *apiServerRepository) PrefetchContents(ctx context.Context, contentIDs [ return resp.ContentIDs } -// Metrics provides access to the metrics registry. -func (r *apiServerRepository) Metrics() *metrics.Registry { - return r.metricsRegistry -} - var _ Repository = (*apiServerRepository)(nil) // openRestAPIRepository connects remote repository over Kopia API. -func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, contentCache *cache.PersistentCache, password string, mr *metrics.Registry) (Repository, error) { +func openRestAPIRepository(ctx context.Context, si *APIServerInfo, password string, par immutableServerRepositoryParameters) (Repository, error) { cli, err := apiclient.NewKopiaAPIClient(apiclient.Options{ BaseURL: si.BaseURL, TrustedServerCertificateFingerprint: si.TrustedServerCertificateFingerprint, - Username: cliOpts.UsernameAtHost(), + Username: par.cliOpts.UsernameAtHost(), Password: password, LogRequests: true, }) @@ -288,27 +270,12 @@ func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien return nil, errors.Wrap(err, "unable to create API client") } - rcc := newRefCountedCloser( - func(ctx context.Context) error { - if contentCache != nil { - contentCache.Close(ctx) - } - - return nil - }, - mr.Close, - ) - rr := &apiServerRepository{ - cli: cli, - cliOpts: cliOpts, - contentCache: contentCache, + immutableServerRepositoryParameters: par, + cli: cli, wso: WriteSessionOptions{ OnUpload: func(i int64) {}, }, - isSharedReadOnlySession: true, - metricsRegistry: mr, - refCountedCloser: rcc, } var p remoterepoapi.Parameters @@ -327,7 +294,7 @@ func(ctx context.Context) error { rr.serverSupportsContentCompression = p.SupportsContentCompression // create object manager using rr as contentManager implementation. - omgr, err := object.NewObjectManager(ctx, rr, rr.objectFormat, mr) + omgr, err := object.NewObjectManager(ctx, rr, rr.objectFormat, par.metricsRegistry) if err != nil { return nil, errors.Wrap(err, "error initializing object manager") } diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index 71bede5d9..f3bdf9a90 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -16,12 +16,10 @@ "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/ctxutil" "github.com/kopia/kopia/internal/gather" apipb "github.com/kopia/kopia/internal/grpcapi" - "github.com/kopia/kopia/internal/metrics" "github.com/kopia/kopia/internal/retry" "github.com/kopia/kopia/internal/tlsutil" "github.com/kopia/kopia/repo/blob" @@ -58,7 +56,6 @@ func errNoSessionResponse() error { // grpcRepositoryClient is an implementation of Repository that connects to an instance of // GPRC API server hosted by `kopia server`. type grpcRepositoryClient struct { - *refCountedCloser conn *grpc.ClientConn innerSessionMutex sync.Mutex @@ -76,17 +73,12 @@ type grpcRepositoryClient struct { asyncWritesWG *errgroup.Group - h hashing.HashFunc - objectFormat format.ObjectFormat + immutableServerRepositoryParameters + serverSupportsContentCompression bool - cliOpts ClientOptions omgr *object.Manager - contentCache *cache.PersistentCache - recent recentlyRead - - metricsRegistry *metrics.Registry } type grpcInnerSession struct { @@ -209,10 +201,6 @@ func (r *grpcRepositoryClient) LegacyWriter() RepositoryWriter { return nil } -func (r *grpcRepositoryClient) ClientOptions() ClientOptions { - return r.cliOpts -} - func (r *grpcRepositoryClient) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) { //nolint:wrapcheck return object.Open(ctx, r, id) @@ -460,7 +448,7 @@ func (r *grpcInnerSession) Flush(ctx context.Context) error { } func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOptions) (context.Context, RepositoryWriter, error) { - w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.refCountedCloser, r.cliOpts, opt, r.contentCache, false, r.metricsRegistry) + w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, opt, false, r.immutableServerRepositoryParameters) if err != nil { return nil, nil, err } @@ -753,9 +741,9 @@ func (c grpcCreds) RequireTransportSecurity() bool { return true } -// OpenGRPCAPIRepository opens the Repository based on remote GRPC server. +// openGRPCAPIRepository opens the Repository based on remote GRPC server. // The APIServerInfo must have the address of the repository as 'https://host:port' -func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, contentCache *cache.PersistentCache, password string, mr *metrics.Registry) (Repository, error) { +func openGRPCAPIRepository(ctx context.Context, si *APIServerInfo, password string, par immutableServerRepositoryParameters) (Repository, error) { var transportCreds credentials.TransportCredentials if si.TrustedServerCertificateFingerprint != "" { @@ -775,7 +763,7 @@ func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien conn, err := grpc.Dial( u.Hostname()+":"+u.Port(), - grpc.WithPerRPCCredentials(grpcCreds{cliOpts.Hostname, cliOpts.Username, password}), + grpc.WithPerRPCCredentials(grpcCreds{par.cliOpts.Hostname, par.cliOpts.Username, password}), grpc.WithTransportCredentials(transportCreds), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize), @@ -786,18 +774,12 @@ func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien return nil, errors.Wrap(err, "dial error") } - rcc := newRefCountedCloser( + par.refCountedCloser.registerEarlyCloseFunc( func(ctx context.Context) error { return errors.Wrap(conn.Close(), "error closing GRPC connection") - }, - func(ctx context.Context) error { - contentCache.Close(ctx) - return nil - }, - mr.Close, - ) + }) - rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, rcc, cliOpts, WriteSessionOptions{}, contentCache, true, mr) + rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, WriteSessionOptions{}, true, par) if err != nil { return nil, err } @@ -864,36 +846,25 @@ func (r *grpcRepositoryClient) killInnerSession() { } } -// Metrics provides access to the metrics registry. -func (r *grpcRepositoryClient) Metrics() *metrics.Registry { - return r.metricsRegistry -} - // newGRPCAPIRepositoryForConnection opens GRPC-based repository connection. func newGRPCAPIRepositoryForConnection( ctx context.Context, conn *grpc.ClientConn, - sc *refCountedCloser, - cliOpts ClientOptions, opt WriteSessionOptions, - contentCache *cache.PersistentCache, transparentRetries bool, - mr *metrics.Registry, + par immutableServerRepositoryParameters, ) (*grpcRepositoryClient, error) { if opt.OnUpload == nil { opt.OnUpload = func(i int64) {} } rr := &grpcRepositoryClient{ - refCountedCloser: sc, - conn: conn, - cliOpts: cliOpts, - transparentRetries: transparentRetries, - opt: opt, - isReadOnly: cliOpts.ReadOnly, - contentCache: contentCache, - asyncWritesWG: new(errgroup.Group), - metricsRegistry: mr, + immutableServerRepositoryParameters: par, + conn: conn, + transparentRetries: transparentRetries, + opt: opt, + isReadOnly: par.cliOpts.ReadOnly, + asyncWritesWG: new(errgroup.Group), } return inSessionWithoutRetry(ctx, rr, func(ctx context.Context, sess *grpcInnerSession) (*grpcRepositoryClient, error) { diff --git a/repo/open.go b/repo/open.go index dc69c3737..d44e73cf0 100644 --- a/repo/open.go +++ b/repo/open.go @@ -161,18 +161,36 @@ func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, pass // OpenAPIServer connects remote repository over Kopia API. func OpenAPIServer(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, cachingOptions *content.CachingOptions, password string) (Repository, error) { - me := metrics.NewRegistry() + mr := metrics.NewRegistry() - contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password, me) + contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password, mr) if err != nil { return nil, errors.Wrap(err, "error opening content cache") } - if si.DisableGRPC { - return openRestAPIRepository(ctx, si, cliOpts, contentCache, password, me) + closer := newRefCountedCloser( + func(ctx context.Context) error { + if contentCache != nil { + contentCache.Close(ctx) + } + + return nil + }, + mr.Close, + ) + + par := immutableServerRepositoryParameters{ + cliOpts: cliOpts, + contentCache: contentCache, + metricsRegistry: mr, + refCountedCloser: closer, } - return OpenGRPCAPIRepository(ctx, si, cliOpts, contentCache, password, me) + if si.DisableGRPC { + return openRestAPIRepository(ctx, si, password, par) + } + + return openGRPCAPIRepository(ctx, si, password, par) } // openDirect opens the repository that directly manipulates blob storage.. @@ -215,8 +233,8 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, DisableInternalLog: options.DisableInternalLog, } - me := metrics.NewRegistry() - st = storagemetrics.NewWrapper(st, me) + mr := metrics.NewRegistry() + st = storagemetrics.NewWrapper(st, mr) fmgr, ferr := format.NewManager(ctx, st, cacheOpts.CacheDirectory, cliOpts.FormatBlobCacheDuration, password, cmOpts.TimeNow) if ferr != nil { @@ -286,7 +304,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, // background/interleaving upgrade lock storage monitor st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures) - scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, me) + scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, mr) if ferr != nil { return nil, errors.Wrap(ferr, "unable to create shared content manager") } @@ -296,19 +314,19 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, SessionHost: cliOpts.Hostname, }, "") - om, ferr := object.NewObjectManager(ctx, cm, fmgr.ObjectFormat(), me) + om, ferr := object.NewObjectManager(ctx, cm, fmgr.ObjectFormat(), mr) if ferr != nil { return nil, errors.Wrap(ferr, "unable to open object manager") } - manifests, ferr := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow}, me) + manifests, ferr := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow}, mr) if ferr != nil { return nil, errors.Wrap(ferr, "unable to open manifests") } - rcc := newRefCountedCloser( + closer := newRefCountedCloser( scm.CloseShared, - me.Close, + mr.Close, ) dr := &directRepository{ @@ -317,17 +335,16 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, blobs: st, mmgr: manifests, sm: scm, - directRepositoryParameters: directRepositoryParameters{ - cachingOptions: *cacheOpts, - fmgr: fmgr, - timeNow: cmOpts.TimeNow, - cliOpts: cliOpts, - configFile: configFile, - nextWriterID: new(int32), - throttler: throttler, - metricsRegistry: me, - - refCountedCloser: rcc, + immutableDirectRepositoryParameters: immutableDirectRepositoryParameters{ + cachingOptions: *cacheOpts, + fmgr: fmgr, + timeNow: cmOpts.TimeNow, + cliOpts: cliOpts, + configFile: configFile, + nextWriterID: new(int32), + throttler: throttler, + metricsRegistry: mr, + refCountedCloser: closer, }, } diff --git a/repo/refcount_closer.go b/repo/refcount_closer.go index 8c4c6a71b..6cf9598d4 100644 --- a/repo/refcount_closer.go +++ b/repo/refcount_closer.go @@ -49,6 +49,10 @@ func (c *refCountedCloser) addRef() { c.refCount.Add(1) } +func (c *refCountedCloser) registerEarlyCloseFunc(f closeFunc) { + c.closers = append(c.closers, append([]closeFunc(nil), f)...) +} + func newRefCountedCloser(f ...closeFunc) *refCountedCloser { rcc := &refCountedCloser{ closers: f, diff --git a/repo/repository.go b/repo/repository.go index 556c64e1f..ad60b8470 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -88,7 +88,7 @@ type DirectRepositoryWriter interface { // RollbackUpgrade(ctx context.Context) error } -type directRepositoryParameters struct { +type immutableDirectRepositoryParameters struct { configFile string cachingOptions content.CachingOptions cliOpts ClientOptions @@ -103,7 +103,7 @@ type directRepositoryParameters struct { // directRepository is an implementation of repository that directly manipulates underlying storage. type directRepository struct { - directRepositoryParameters + immutableDirectRepositoryParameters blobs blob.Storage cmgr *content.WriteManager @@ -262,12 +262,12 @@ func (r *directRepository) NewDirectWriter(ctx context.Context, opt WriteSession } w := &directRepository{ - directRepositoryParameters: r.directRepositoryParameters, - blobs: r.blobs, - cmgr: cmgr, - omgr: omgr, - mmgr: mmgr, - sm: r.sm, + immutableDirectRepositoryParameters: r.immutableDirectRepositoryParameters, + blobs: r.blobs, + cmgr: cmgr, + omgr: omgr, + mmgr: mmgr, + sm: r.sm, } w.addRef() diff --git a/repo/server_repository_params.go b/repo/server_repository_params.go new file mode 100644 index 000000000..cd870d37c --- /dev/null +++ b/repo/server_repository_params.go @@ -0,0 +1,27 @@ +package repo + +import ( + "github.com/kopia/kopia/internal/cache" + "github.com/kopia/kopia/internal/metrics" + "github.com/kopia/kopia/repo/format" + "github.com/kopia/kopia/repo/hashing" +) + +// immutableServerRepositoryParameters contains immutable parameters shared between HTTP and GRPC clients. +type immutableServerRepositoryParameters struct { + h hashing.HashFunc + objectFormat format.ObjectFormat + cliOpts ClientOptions + metricsRegistry *metrics.Registry + contentCache *cache.PersistentCache + *refCountedCloser +} + +// Metrics provides access to the metrics registry. +func (r *immutableServerRepositoryParameters) Metrics() *metrics.Registry { + return r.metricsRegistry +} + +func (r *immutableServerRepositoryParameters) ClientOptions() ClientOptions { + return r.cliOpts +}