From e1dce8a049c994eaf5f3ace5fdc53aa2c66886e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julio=20L=C3=B3pez?= <1953782+julio-lopez@users.noreply.github.com> Date: Thu, 9 Apr 2026 10:26:02 -0700 Subject: [PATCH] fix(repository): sync index blob file before closing (#5292) - Sync index blob file before closing - Ensure file is closed even when there are errors - Remove temp file on error - Make writeTempFileAtomic testable - Test writeTempFile atomic --- .../committed_content_index_disk_cache.go | 26 -- repo/content/write_temp_file.go | 86 ++++++ repo/content/write_temp_file_test.go | 253 ++++++++++++++++++ 3 files changed, 339 insertions(+), 26 deletions(-) create mode 100644 repo/content/write_temp_file.go create mode 100644 repo/content/write_temp_file_test.go diff --git a/repo/content/committed_content_index_disk_cache.go b/repo/content/committed_content_index_disk_cache.go index 6a65908c9..d4aa2d272 100644 --- a/repo/content/committed_content_index_disk_cache.go +++ b/repo/content/committed_content_index_disk_cache.go @@ -10,7 +10,6 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/blobparam" - "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/contentlog" "github.com/kopia/kopia/internal/contentlog/logparam" "github.com/kopia/kopia/internal/gather" @@ -95,31 +94,6 @@ func (c *diskCommittedContentIndexCache) addContentToCache(ctx context.Context, return nil } -func writeTempFileAtomic(dirname string, data []byte) (string, error) { - // write to a temp file to avoid race where two processes are writing at the same time. - tf, err := os.CreateTemp(dirname, "tmp") - if err != nil { - if os.IsNotExist(err) { - os.MkdirAll(dirname, cache.DirMode) //nolint:errcheck - tf, err = os.CreateTemp(dirname, "tmp") - } - } - - if err != nil { - return "", errors.Wrap(err, "can't create tmp file") - } - - if _, err := tf.Write(data); err != nil { - return "", errors.Wrap(err, "can't write to temp file") - } - - if err := tf.Close(); err != nil { - return "", errors.New("can't close tmp file") - } - - return tf.Name(), nil -} - func (c *diskCommittedContentIndexCache) expireUnused(ctx context.Context, used []blob.ID) error { contentlog.Log2(ctx, c.log, "expireUnused", blobparam.BlobIDList("except", used), diff --git a/repo/content/write_temp_file.go b/repo/content/write_temp_file.go new file mode 100644 index 000000000..78d7648c4 --- /dev/null +++ b/repo/content/write_temp_file.go @@ -0,0 +1,86 @@ +package content + +import ( + stderrors "errors" + "io" + "io/fs" + "os" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/cache" +) + +type file interface { + io.WriteCloser + Name() string + Sync() error +} + +type fsInterface interface { + CreateTemp(dir, pattern string) (file, error) + Remove(name string) error + MkdirAll(path string, perm fs.FileMode) error +} + +type localFS struct{} + +func (l localFS) CreateTemp(dir, pattern string) (file, error) { + return os.CreateTemp(dir, pattern) //nolint:wrapcheck +} + +func (l localFS) Remove(name string) error { + return os.Remove(name) //nolint:wrapcheck +} + +func (l localFS) MkdirAll(dirPath string, perm fs.FileMode) error { + return os.MkdirAll(dirPath, perm) //nolint:wrapcheck +} + +func writeTempFileAtomic(dirname string, data []byte) (filename string, err error) { + return writeTempFileAtomicImp(localFS{}, dirname, data) +} + +func writeTempFileAtomicImp(fsi fsInterface, dirname string, data []byte) (filename string, err error) { + // write to a temp file to avoid race where two processes are writing at the same time. + tf, err2 := fsi.CreateTemp(dirname, "tmp") + if err2 != nil { + if os.IsNotExist(err2) { + if mdErr := fsi.MkdirAll(dirname, cache.DirMode); mdErr != nil { + return "", stderrors.Join(errors.Wrap(mdErr, "cannot create parent directory for temp file"), + errors.Wrap(err2, "cannot create temp file")) + } + + tf, err2 = fsi.CreateTemp(dirname, "tmp") + } + } + + if err2 != nil { + return "", errors.Wrap(err2, "can't create tmp file") + } + + defer func() { + if cerr := tf.Close(); cerr != nil { + err = stderrors.Join(err, errors.Wrap(cerr, "can't close tmp file")) + } + + if err != nil { + // remove tmp file on error to avoid leaving them behind + if rerr := fsi.Remove(tf.Name()); rerr != nil { + err = stderrors.Join(err, errors.Wrap(rerr, "can't remove tmp file")) + } + + filename = "" + } + }() + + if _, err2 := tf.Write(data); err2 != nil { + return "", errors.Wrap(err2, "can't write to temp file") + } + + if err2 := tf.Sync(); err2 != nil { + return "", errors.Wrapf(err2, "cannot sync temporary file in dir %s", dirname) + } + + return tf.Name(), nil +} diff --git a/repo/content/write_temp_file_test.go b/repo/content/write_temp_file_test.go new file mode 100644 index 000000000..e57bd9a3c --- /dev/null +++ b/repo/content/write_temp_file_test.go @@ -0,0 +1,253 @@ +package content + +import ( + "os" + "path/filepath" + "runtime" + "sync/atomic" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +// TestWriteTempFileAtomic_HappyPath verifies that writeTempFileAtomic writes +// the expected content and returns a valid file path. +func TestWriteTempFileAtomic_HappyPath(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + data := []byte("index-blob-content") + + name, err := writeTempFileAtomicImp(localFS{}, dir, data) + require.NoError(t, err) + require.NotEmpty(t, name) + + // File must exist under the given directory. + require.Equal(t, dir, filepath.Dir(name)) + + got, err := os.ReadFile(name) + require.NoError(t, err) + require.Equal(t, data, got) +} + +// TestWriteTempFileAtomic_EmptyData verifies that an empty payload is written +// without error and produces a valid (zero-byte) file. +func TestWriteTempFileAtomic_EmptyData(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + + name, err := writeTempFileAtomicImp(localFS{}, dir, []byte{}) + require.NoError(t, err) + + info, err := os.Stat(name) + require.NoError(t, err) + require.EqualValues(t, 0, info.Size()) +} + +// TestWriteTempFileAtomic_CreatesDirectoryIfMissing verifies that +// writeTempFileAtomic creates the target directory when it does not exist, +// matching the MkdirAll fallback path. +func TestWriteTempFileAtomic_CreatesDirectoryIfMissing(t *testing.T) { + t.Parallel() + + // Use a path that does not yet exist. + dir := filepath.Join(t.TempDir(), "new", "nested", "dir") + + data := []byte("hello") + + name, err := writeTempFileAtomicImp(localFS{}, dir, data) + require.NoError(t, err) + require.Equal(t, dir, filepath.Dir(name)) + + got, err := os.ReadFile(name) + require.NoError(t, err) + require.Equal(t, data, got) +} + +// TestWriteTempFileAtomic_NonExistentDirUnwritable verifies that an error is +// returned when the directory cannot be created (e.g. parent is read-only). +// Skipped on platforms where root may bypass permissions. +func TestWriteTempFileAtomic_NonExistentDirUnwritable(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("does not work on windows due to chmod") + } + + if os.Getuid() == 0 { + t.Skip("skipping permission test when running as root") + } + + t.Parallel() + + // Create a read-only parent so that MkdirAll cannot create the child. + parent := t.TempDir() + require.NoError(t, os.Chmod(parent, 0o555)) + + t.Cleanup(func() { os.Chmod(parent, 0o755) }) //nolint:errcheck + + dir := filepath.Join(parent, "child") + + _, err := writeTempFileAtomicImp(localFS{}, dir, []byte("data")) + require.Error(t, err) + require.Contains(t, err.Error(), "cannot create parent directory for temp file") +} + +type mockFileSynced struct { + file + + synced atomic.Bool +} + +func (mf *mockFileSynced) Write(p []byte) (n int, err error) { + mf.synced.Store(false) + + return mf.file.Write(p) +} + +func (mf *mockFileSynced) Sync() error { + err := mf.file.Sync() + if err == nil { + mf.synced.Store(true) + } + + return err +} + +// TestWriteTempFileAtomic_FileIsSynced verifies that Sync is called after +// writing data to the temporary file. +func TestWriteTempFileAtomic_FileIsSynced(t *testing.T) { + t.Parallel() + + var mockedFile mockFileSynced + + dir := t.TempDir() + data := []byte("synced-content") + + mfs := mockfs{ + createWrapper: func(f file) file { + mockedFile.file = f + + return &mockedFile + }, + } + + name, err := writeTempFileAtomicImp(mfs, dir, data) + require.NoError(t, err) + require.True(t, mockedFile.synced.Load()) + + // Open a new handle to avoid OS read-cache of the same descriptor. + b, err := os.ReadFile(name) + require.NoError(t, err) + require.Equal(t, data, b) +} + +// TestWriteTempFileAtomic_NoTempFilesLeft verifies that writeTempFileAtomic +// does not leak the temporary file after a successful call — the caller is +// expected to rename it, but the file descriptor must already be closed. +// We confirm this indirectly: the returned path must be stat-able (file +// exists and is closed) with no other tmp* siblings beyond the returned one. +func TestWriteTempFileAtomic_NoTempFilesLeft(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + + name, err := writeTempFileAtomicImp(localFS{}, dir, []byte("data")) + require.NoError(t, err) + + entries, err := os.ReadDir(dir) + require.NoError(t, err) + + // Only one file should exist: the one returned. + require.Len(t, entries, 1) + require.Equal(t, filepath.Base(name), entries[0].Name()) +} + +type mockfs struct { + localFS + + createWrapper func(file) file +} + +func (m mockfs) CreateTemp(dir, pattern string) (file, error) { + f, err := m.localFS.CreateTemp(dir, pattern) + + if m.createWrapper != nil { + f = m.createWrapper(f) + } + + return f, err +} + +type mockFileWriteError struct { + file +} + +func (mf mockFileWriteError) Write(p []byte) (n int, err error) { + return 0, errors.New("mock file write error") +} + +type mockFileSyncError struct { + file +} + +func (mf mockFileSyncError) Sync() error { + return errors.New("mock file sync error") +} + +type mockFileCloseError struct { + file +} + +func (mf mockFileCloseError) Close() error { + if err := mf.file.Close(); err != nil { + return err + } + + return errors.New("mock file close error") +} + +// TestWriteTempFileAtomic_NoTempFilesLeftOnError verifies that writeTempFileAtomic +// does not leak the temporary file after a write, sync, or close error. +func TestWriteTempFileAtomic_NoTempFilesLeftOnError(t *testing.T) { + t.Parallel() + + cases := []struct { + mockfs + description string + }{ + { + description: "write-error", + mockfs: mockfs{ + createWrapper: func(f file) file { return mockFileWriteError{file: f} }, + }, + }, + { + description: "sync-error", + mockfs: mockfs{ + createWrapper: func(f file) file { return mockFileSyncError{file: f} }, + }, + }, + { + description: "close-error", + mockfs: mockfs{ + createWrapper: func(f file) file { return mockFileCloseError{file: f} }, + }, + }, + } + + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + dir := t.TempDir() + + name, err := writeTempFileAtomicImp(c.mockfs, dir, []byte("data")) + require.Error(t, err) + require.Empty(t, name) + t.Log("error:", err) + + entries, err := os.ReadDir(dir) + require.NoError(t, err) + require.Empty(t, entries) + }) + } +}