From 369d304084c7c55f490dfc40f14ce60a79937629 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sun, 6 Mar 2022 16:56:30 -0800 Subject: [PATCH] refactor(repository): better context cancelation handling (#1802) Instead of ignoring context cancelation in Open(), ensure we don't spawn goroutines that might be canceled. --- cli/command_server_start.go | 5 ++-- internal/ctxutil/detach.go | 7 ++++++ internal/epoch/epoch_manager.go | 24 +++++++------------ internal/repotesting/repotesting.go | 2 +- internal/server/server_test.go | 9 ++++++- repo/open.go | 5 ---- .../api_server_repository_test.go | 8 ++++++- 7 files changed, 35 insertions(+), 25 deletions(-) diff --git a/cli/command_server_start.go b/cli/command_server_start.go index ba809e00e..6eb2c0e94 100644 --- a/cli/command_server_start.go +++ b/cli/command_server_start.go @@ -21,6 +21,7 @@ htpasswd "github.com/tg123/go-htpasswd" "github.com/kopia/kopia/internal/auth" + "github.com/kopia/kopia/internal/ctxutil" "github.com/kopia/kopia/internal/server" "github.com/kopia/kopia/repo" ) @@ -222,12 +223,12 @@ func (c *commandServerStart) run(ctx context.Context) error { if c.serverStartShutdownWhenStdinClosed { log(ctx).Infof("Server will close when stdin is closed...") - go func() { + ctxutil.GoDetached(ctx, func(ctx context.Context) { // consume all stdin and close the server when it closes io.ReadAll(os.Stdin) //nolint:errcheck log(ctx).Infof("Shutting down server...") httpServer.Shutdown(ctx) //nolint:errcheck - }() + }) } onExternalConfigReloadRequest(func() { diff --git a/internal/ctxutil/detach.go b/internal/ctxutil/detach.go index 86f161ffc..81a8d40f2 100644 --- a/internal/ctxutil/detach.go +++ b/internal/ctxutil/detach.go @@ -16,6 +16,13 @@ func Detach(ctx context.Context) context.Context { return detachedContext{context.Background(), ctx} } +// GoDetached invokes the provided function in a goroutine where the context is detached. +func GoDetached(ctx context.Context, fun func(ctx context.Context)) { + go func() { + fun(Detach(ctx)) + }() +} + func (d detachedContext) Value(key interface{}) interface{} { return d.wrapped.Value(key) } diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index e2906ceb6..79407a187 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -505,36 +505,32 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs e.log.Debugf("generating range checkpoint") - // we're starting background work, ignore parent cancelation signal. - ctx = ctxutil.Detach(ctx) - e.backgroundWork.Add(1) - go func() { + // we're starting background work, ignore parent cancelation signal. + ctxutil.GoDetached(ctx, func(ctx context.Context) { defer e.backgroundWork.Done() if err := e.generateRangeCheckpointFromCommittedState(ctx, cs, firstNonRangeCompacted, latestSettled); err != nil { e.log.Errorf("unable to generate full checkpoint: %v, performance will be affected", err) } - }() + }) } func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) { } func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot) { - // we're starting background work, ignore parent cancelation signal. - ctx = ctxutil.Detach(ctx) - e.backgroundWork.Add(1) - go func() { + // we're starting background work, ignore parent cancelation signal. + ctxutil.GoDetached(ctx, func(ctx context.Context) { defer e.backgroundWork.Done() if err := e.cleanupInternal(ctx, cs); err != nil { e.log.Errorf("error cleaning up index blobs: %v, performance may be affected", err) } - }() + }) } func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[int][]blob.Metadata, error) { @@ -877,12 +873,10 @@ func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs CurrentSna } if epochSettled { - // we're starting background work, ignore parent cancelation signal. - ctx = ctxutil.Detach(ctx) - e.backgroundWork.Add(1) - go func() { + // we're starting background work, ignore parent cancelation signal. + ctxutil.GoDetached(ctx, func(ctx context.Context) { defer e.backgroundWork.Done() e.log.Debugf("starting single-epoch compaction of %v", epoch) @@ -890,7 +884,7 @@ func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs CurrentSna if err := e.compact(ctx, blob.IDsFromMetadata(uncompactedBlobs), compactedEpochBlobPrefix(epoch)); err != nil { e.log.Errorf("unable to compact blobs for epoch %v: %v, performance will be affected", epoch, err) } - }() + }) } // return uncompacted blobs to the caller while we're compacting them in background diff --git a/internal/repotesting/repotesting.go b/internal/repotesting/repotesting.go index fa7860d09..b5c014876 100644 --- a/internal/repotesting/repotesting.go +++ b/internal/repotesting/repotesting.go @@ -161,7 +161,7 @@ func (e *Environment) MustReopen(tb testing.TB, openOpts ...func(*repo.Options)) // ensure context passed to Open() is not used for cancelation signal. ctx2, cancel := context.WithCancel(ctx) - cancel() + defer cancel() rep, err := repo.Open(ctx2, e.ConfigFile(), e.Password, repoOptions(openOpts)) if err != nil { diff --git a/internal/server/server_test.go b/internal/server/server_test.go index f6e5a3b54..b0f94b228 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -110,13 +110,20 @@ func testServer(t *testing.T, disableGRPC bool) { apiServerInfo.DisableGRPC = disableGRPC - rep, err := repo.OpenAPIServer(ctx, apiServerInfo, repo.ClientOptions{ + ctx2, cancel := context.WithCancel(ctx) + + rep, err := repo.OpenAPIServer(ctx2, apiServerInfo, repo.ClientOptions{ Username: testUsername, Hostname: testHostname, }, &content.CachingOptions{ CacheDirectory: testutil.TempDirectory(t), MaxCacheSizeBytes: maxCacheSizeBytes, }, testPassword) + + // cancel immediately to ensure we did not spawn goroutines that depend on ctx inside + // repo.OpenAPIServer() + cancel() + if err != nil { t.Fatal(err) } diff --git a/repo/open.go b/repo/open.go index 000c5e59e..015332d78 100644 --- a/repo/open.go +++ b/repo/open.go @@ -15,7 +15,6 @@ "github.com/kopia/kopia/internal/atomicfile" "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/clock" - "github.com/kopia/kopia/internal/ctxutil" "github.com/kopia/kopia/internal/epoch" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/retry" @@ -90,10 +89,6 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r } }() - // ignore cancellation/timeout from the provided context, since we may be spawning - // goroutines that should survive when the provided context is done. - ctx = ctxutil.Detach(ctx) - if options == nil { options = &Options{} } diff --git a/tests/end_to_end_test/api_server_repository_test.go b/tests/end_to_end_test/api_server_repository_test.go index c4aa1f1ae..a181439a1 100644 --- a/tests/end_to_end_test/api_server_repository_test.go +++ b/tests/end_to_end_test/api_server_repository_test.go @@ -123,7 +123,8 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al waitUntilServerStarted(ctx, t, controlClient) // open repository client. - rep, err := repo.OpenAPIServer(ctx, &repo.APIServerInfo{ + ctx2, cancel := context.WithCancel(ctx) + rep, err := repo.OpenAPIServer(ctx2, &repo.APIServerInfo{ BaseURL: sp.BaseURL, TrustedServerCertificateFingerprint: sp.SHA256Fingerprint, DisableGRPC: !useGRPC, @@ -131,6 +132,11 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al Username: "foo", Hostname: "bar", }, nil, "baz") + + // cancel immediately to ensure we did not spawn goroutines that depend on ctx inside + // repo.OpenAPIServer() + cancel() + if err != nil { t.Fatal(err) }