mirror of
https://github.com/kopia/kopia.git
synced 2026-05-14 09:47:35 -04:00
refactor(repository): better context cancelation handling (#1802)
Instead of ignoring context cancelation in Open(), ensure we don't spawn goroutines that might be canceled.
This commit is contained in:
@@ -21,6 +21,7 @@
|
||||
htpasswd "github.com/tg123/go-htpasswd"
|
||||
|
||||
"github.com/kopia/kopia/internal/auth"
|
||||
"github.com/kopia/kopia/internal/ctxutil"
|
||||
"github.com/kopia/kopia/internal/server"
|
||||
"github.com/kopia/kopia/repo"
|
||||
)
|
||||
@@ -222,12 +223,12 @@ func (c *commandServerStart) run(ctx context.Context) error {
|
||||
if c.serverStartShutdownWhenStdinClosed {
|
||||
log(ctx).Infof("Server will close when stdin is closed...")
|
||||
|
||||
go func() {
|
||||
ctxutil.GoDetached(ctx, func(ctx context.Context) {
|
||||
// consume all stdin and close the server when it closes
|
||||
io.ReadAll(os.Stdin) //nolint:errcheck
|
||||
log(ctx).Infof("Shutting down server...")
|
||||
httpServer.Shutdown(ctx) //nolint:errcheck
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
onExternalConfigReloadRequest(func() {
|
||||
|
||||
@@ -16,6 +16,13 @@ func Detach(ctx context.Context) context.Context {
|
||||
return detachedContext{context.Background(), ctx}
|
||||
}
|
||||
|
||||
// GoDetached invokes the provided function in a goroutine where the context is detached.
|
||||
func GoDetached(ctx context.Context, fun func(ctx context.Context)) {
|
||||
go func() {
|
||||
fun(Detach(ctx))
|
||||
}()
|
||||
}
|
||||
|
||||
func (d detachedContext) Value(key interface{}) interface{} {
|
||||
return d.wrapped.Value(key)
|
||||
}
|
||||
|
||||
@@ -505,36 +505,32 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs
|
||||
|
||||
e.log.Debugf("generating range checkpoint")
|
||||
|
||||
// we're starting background work, ignore parent cancelation signal.
|
||||
ctx = ctxutil.Detach(ctx)
|
||||
|
||||
e.backgroundWork.Add(1)
|
||||
|
||||
go func() {
|
||||
// we're starting background work, ignore parent cancelation signal.
|
||||
ctxutil.GoDetached(ctx, func(ctx context.Context) {
|
||||
defer e.backgroundWork.Done()
|
||||
|
||||
if err := e.generateRangeCheckpointFromCommittedState(ctx, cs, firstNonRangeCompacted, latestSettled); err != nil {
|
||||
e.log.Errorf("unable to generate full checkpoint: %v, performance will be affected", err)
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) {
|
||||
}
|
||||
|
||||
func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot) {
|
||||
// we're starting background work, ignore parent cancelation signal.
|
||||
ctx = ctxutil.Detach(ctx)
|
||||
|
||||
e.backgroundWork.Add(1)
|
||||
|
||||
go func() {
|
||||
// we're starting background work, ignore parent cancelation signal.
|
||||
ctxutil.GoDetached(ctx, func(ctx context.Context) {
|
||||
defer e.backgroundWork.Done()
|
||||
|
||||
if err := e.cleanupInternal(ctx, cs); err != nil {
|
||||
e.log.Errorf("error cleaning up index blobs: %v, performance may be affected", err)
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[int][]blob.Metadata, error) {
|
||||
@@ -877,12 +873,10 @@ func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs CurrentSna
|
||||
}
|
||||
|
||||
if epochSettled {
|
||||
// we're starting background work, ignore parent cancelation signal.
|
||||
ctx = ctxutil.Detach(ctx)
|
||||
|
||||
e.backgroundWork.Add(1)
|
||||
|
||||
go func() {
|
||||
// we're starting background work, ignore parent cancelation signal.
|
||||
ctxutil.GoDetached(ctx, func(ctx context.Context) {
|
||||
defer e.backgroundWork.Done()
|
||||
|
||||
e.log.Debugf("starting single-epoch compaction of %v", epoch)
|
||||
@@ -890,7 +884,7 @@ func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs CurrentSna
|
||||
if err := e.compact(ctx, blob.IDsFromMetadata(uncompactedBlobs), compactedEpochBlobPrefix(epoch)); err != nil {
|
||||
e.log.Errorf("unable to compact blobs for epoch %v: %v, performance will be affected", epoch, err)
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
// return uncompacted blobs to the caller while we're compacting them in background
|
||||
|
||||
@@ -161,7 +161,7 @@ func (e *Environment) MustReopen(tb testing.TB, openOpts ...func(*repo.Options))
|
||||
|
||||
// ensure context passed to Open() is not used for cancelation signal.
|
||||
ctx2, cancel := context.WithCancel(ctx)
|
||||
cancel()
|
||||
defer cancel()
|
||||
|
||||
rep, err := repo.Open(ctx2, e.ConfigFile(), e.Password, repoOptions(openOpts))
|
||||
if err != nil {
|
||||
|
||||
@@ -110,13 +110,20 @@ func testServer(t *testing.T, disableGRPC bool) {
|
||||
|
||||
apiServerInfo.DisableGRPC = disableGRPC
|
||||
|
||||
rep, err := repo.OpenAPIServer(ctx, apiServerInfo, repo.ClientOptions{
|
||||
ctx2, cancel := context.WithCancel(ctx)
|
||||
|
||||
rep, err := repo.OpenAPIServer(ctx2, apiServerInfo, repo.ClientOptions{
|
||||
Username: testUsername,
|
||||
Hostname: testHostname,
|
||||
}, &content.CachingOptions{
|
||||
CacheDirectory: testutil.TempDirectory(t),
|
||||
MaxCacheSizeBytes: maxCacheSizeBytes,
|
||||
}, testPassword)
|
||||
|
||||
// cancel immediately to ensure we did not spawn goroutines that depend on ctx inside
|
||||
// repo.OpenAPIServer()
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
"github.com/kopia/kopia/internal/atomicfile"
|
||||
"github.com/kopia/kopia/internal/cache"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/ctxutil"
|
||||
"github.com/kopia/kopia/internal/epoch"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/retry"
|
||||
@@ -90,10 +89,6 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r
|
||||
}
|
||||
}()
|
||||
|
||||
// ignore cancellation/timeout from the provided context, since we may be spawning
|
||||
// goroutines that should survive when the provided context is done.
|
||||
ctx = ctxutil.Detach(ctx)
|
||||
|
||||
if options == nil {
|
||||
options = &Options{}
|
||||
}
|
||||
|
||||
@@ -123,7 +123,8 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
|
||||
waitUntilServerStarted(ctx, t, controlClient)
|
||||
|
||||
// open repository client.
|
||||
rep, err := repo.OpenAPIServer(ctx, &repo.APIServerInfo{
|
||||
ctx2, cancel := context.WithCancel(ctx)
|
||||
rep, err := repo.OpenAPIServer(ctx2, &repo.APIServerInfo{
|
||||
BaseURL: sp.BaseURL,
|
||||
TrustedServerCertificateFingerprint: sp.SHA256Fingerprint,
|
||||
DisableGRPC: !useGRPC,
|
||||
@@ -131,6 +132,11 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
|
||||
Username: "foo",
|
||||
Hostname: "bar",
|
||||
}, nil, "baz")
|
||||
|
||||
// cancel immediately to ensure we did not spawn goroutines that depend on ctx inside
|
||||
// repo.OpenAPIServer()
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user