mirror of
https://github.com/kopia/kopia.git
synced 2026-05-09 15:23:02 -04:00
fix(repository): sync index blob file before closing (#5292)
- Sync index blob file before closing - Ensure file is closed even when there are errors - Remove temp file on error - Make writeTempFileAtomic testable - Test writeTempFile atomic
This commit is contained in:
@@ -10,7 +10,6 @@
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobparam"
|
||||
"github.com/kopia/kopia/internal/cache"
|
||||
"github.com/kopia/kopia/internal/contentlog"
|
||||
"github.com/kopia/kopia/internal/contentlog/logparam"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
@@ -95,31 +94,6 @@ func (c *diskCommittedContentIndexCache) addContentToCache(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeTempFileAtomic(dirname string, data []byte) (string, error) {
|
||||
// write to a temp file to avoid race where two processes are writing at the same time.
|
||||
tf, err := os.CreateTemp(dirname, "tmp")
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
os.MkdirAll(dirname, cache.DirMode) //nolint:errcheck
|
||||
tf, err = os.CreateTemp(dirname, "tmp")
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "can't create tmp file")
|
||||
}
|
||||
|
||||
if _, err := tf.Write(data); err != nil {
|
||||
return "", errors.Wrap(err, "can't write to temp file")
|
||||
}
|
||||
|
||||
if err := tf.Close(); err != nil {
|
||||
return "", errors.New("can't close tmp file")
|
||||
}
|
||||
|
||||
return tf.Name(), nil
|
||||
}
|
||||
|
||||
func (c *diskCommittedContentIndexCache) expireUnused(ctx context.Context, used []blob.ID) error {
|
||||
contentlog.Log2(ctx, c.log, "expireUnused",
|
||||
blobparam.BlobIDList("except", used),
|
||||
|
||||
86
repo/content/write_temp_file.go
Normal file
86
repo/content/write_temp_file.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package content
|
||||
|
||||
import (
|
||||
stderrors "errors"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/cache"
|
||||
)
|
||||
|
||||
type file interface {
|
||||
io.WriteCloser
|
||||
Name() string
|
||||
Sync() error
|
||||
}
|
||||
|
||||
type fsInterface interface {
|
||||
CreateTemp(dir, pattern string) (file, error)
|
||||
Remove(name string) error
|
||||
MkdirAll(path string, perm fs.FileMode) error
|
||||
}
|
||||
|
||||
type localFS struct{}
|
||||
|
||||
func (l localFS) CreateTemp(dir, pattern string) (file, error) {
|
||||
return os.CreateTemp(dir, pattern) //nolint:wrapcheck
|
||||
}
|
||||
|
||||
func (l localFS) Remove(name string) error {
|
||||
return os.Remove(name) //nolint:wrapcheck
|
||||
}
|
||||
|
||||
func (l localFS) MkdirAll(dirPath string, perm fs.FileMode) error {
|
||||
return os.MkdirAll(dirPath, perm) //nolint:wrapcheck
|
||||
}
|
||||
|
||||
func writeTempFileAtomic(dirname string, data []byte) (filename string, err error) {
|
||||
return writeTempFileAtomicImp(localFS{}, dirname, data)
|
||||
}
|
||||
|
||||
func writeTempFileAtomicImp(fsi fsInterface, dirname string, data []byte) (filename string, err error) {
|
||||
// write to a temp file to avoid race where two processes are writing at the same time.
|
||||
tf, err2 := fsi.CreateTemp(dirname, "tmp")
|
||||
if err2 != nil {
|
||||
if os.IsNotExist(err2) {
|
||||
if mdErr := fsi.MkdirAll(dirname, cache.DirMode); mdErr != nil {
|
||||
return "", stderrors.Join(errors.Wrap(mdErr, "cannot create parent directory for temp file"),
|
||||
errors.Wrap(err2, "cannot create temp file"))
|
||||
}
|
||||
|
||||
tf, err2 = fsi.CreateTemp(dirname, "tmp")
|
||||
}
|
||||
}
|
||||
|
||||
if err2 != nil {
|
||||
return "", errors.Wrap(err2, "can't create tmp file")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if cerr := tf.Close(); cerr != nil {
|
||||
err = stderrors.Join(err, errors.Wrap(cerr, "can't close tmp file"))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// remove tmp file on error to avoid leaving them behind
|
||||
if rerr := fsi.Remove(tf.Name()); rerr != nil {
|
||||
err = stderrors.Join(err, errors.Wrap(rerr, "can't remove tmp file"))
|
||||
}
|
||||
|
||||
filename = ""
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err2 := tf.Write(data); err2 != nil {
|
||||
return "", errors.Wrap(err2, "can't write to temp file")
|
||||
}
|
||||
|
||||
if err2 := tf.Sync(); err2 != nil {
|
||||
return "", errors.Wrapf(err2, "cannot sync temporary file in dir %s", dirname)
|
||||
}
|
||||
|
||||
return tf.Name(), nil
|
||||
}
|
||||
253
repo/content/write_temp_file_test.go
Normal file
253
repo/content/write_temp_file_test.go
Normal file
@@ -0,0 +1,253 @@
|
||||
package content
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestWriteTempFileAtomic_HappyPath verifies that writeTempFileAtomic writes
|
||||
// the expected content and returns a valid file path.
|
||||
func TestWriteTempFileAtomic_HappyPath(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir := t.TempDir()
|
||||
data := []byte("index-blob-content")
|
||||
|
||||
name, err := writeTempFileAtomicImp(localFS{}, dir, data)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, name)
|
||||
|
||||
// File must exist under the given directory.
|
||||
require.Equal(t, dir, filepath.Dir(name))
|
||||
|
||||
got, err := os.ReadFile(name)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, data, got)
|
||||
}
|
||||
|
||||
// TestWriteTempFileAtomic_EmptyData verifies that an empty payload is written
|
||||
// without error and produces a valid (zero-byte) file.
|
||||
func TestWriteTempFileAtomic_EmptyData(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir := t.TempDir()
|
||||
|
||||
name, err := writeTempFileAtomicImp(localFS{}, dir, []byte{})
|
||||
require.NoError(t, err)
|
||||
|
||||
info, err := os.Stat(name)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 0, info.Size())
|
||||
}
|
||||
|
||||
// TestWriteTempFileAtomic_CreatesDirectoryIfMissing verifies that
|
||||
// writeTempFileAtomic creates the target directory when it does not exist,
|
||||
// matching the MkdirAll fallback path.
|
||||
func TestWriteTempFileAtomic_CreatesDirectoryIfMissing(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Use a path that does not yet exist.
|
||||
dir := filepath.Join(t.TempDir(), "new", "nested", "dir")
|
||||
|
||||
data := []byte("hello")
|
||||
|
||||
name, err := writeTempFileAtomicImp(localFS{}, dir, data)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, dir, filepath.Dir(name))
|
||||
|
||||
got, err := os.ReadFile(name)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, data, got)
|
||||
}
|
||||
|
||||
// TestWriteTempFileAtomic_NonExistentDirUnwritable verifies that an error is
|
||||
// returned when the directory cannot be created (e.g. parent is read-only).
|
||||
// Skipped on platforms where root may bypass permissions.
|
||||
func TestWriteTempFileAtomic_NonExistentDirUnwritable(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("does not work on windows due to chmod")
|
||||
}
|
||||
|
||||
if os.Getuid() == 0 {
|
||||
t.Skip("skipping permission test when running as root")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
|
||||
// Create a read-only parent so that MkdirAll cannot create the child.
|
||||
parent := t.TempDir()
|
||||
require.NoError(t, os.Chmod(parent, 0o555))
|
||||
|
||||
t.Cleanup(func() { os.Chmod(parent, 0o755) }) //nolint:errcheck
|
||||
|
||||
dir := filepath.Join(parent, "child")
|
||||
|
||||
_, err := writeTempFileAtomicImp(localFS{}, dir, []byte("data"))
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "cannot create parent directory for temp file")
|
||||
}
|
||||
|
||||
type mockFileSynced struct {
|
||||
file
|
||||
|
||||
synced atomic.Bool
|
||||
}
|
||||
|
||||
func (mf *mockFileSynced) Write(p []byte) (n int, err error) {
|
||||
mf.synced.Store(false)
|
||||
|
||||
return mf.file.Write(p)
|
||||
}
|
||||
|
||||
func (mf *mockFileSynced) Sync() error {
|
||||
err := mf.file.Sync()
|
||||
if err == nil {
|
||||
mf.synced.Store(true)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// TestWriteTempFileAtomic_FileIsSynced verifies that Sync is called after
|
||||
// writing data to the temporary file.
|
||||
func TestWriteTempFileAtomic_FileIsSynced(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var mockedFile mockFileSynced
|
||||
|
||||
dir := t.TempDir()
|
||||
data := []byte("synced-content")
|
||||
|
||||
mfs := mockfs{
|
||||
createWrapper: func(f file) file {
|
||||
mockedFile.file = f
|
||||
|
||||
return &mockedFile
|
||||
},
|
||||
}
|
||||
|
||||
name, err := writeTempFileAtomicImp(mfs, dir, data)
|
||||
require.NoError(t, err)
|
||||
require.True(t, mockedFile.synced.Load())
|
||||
|
||||
// Open a new handle to avoid OS read-cache of the same descriptor.
|
||||
b, err := os.ReadFile(name)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, data, b)
|
||||
}
|
||||
|
||||
// TestWriteTempFileAtomic_NoTempFilesLeft verifies that writeTempFileAtomic
|
||||
// does not leak the temporary file after a successful call — the caller is
|
||||
// expected to rename it, but the file descriptor must already be closed.
|
||||
// We confirm this indirectly: the returned path must be stat-able (file
|
||||
// exists and is closed) with no other tmp* siblings beyond the returned one.
|
||||
func TestWriteTempFileAtomic_NoTempFilesLeft(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir := t.TempDir()
|
||||
|
||||
name, err := writeTempFileAtomicImp(localFS{}, dir, []byte("data"))
|
||||
require.NoError(t, err)
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Only one file should exist: the one returned.
|
||||
require.Len(t, entries, 1)
|
||||
require.Equal(t, filepath.Base(name), entries[0].Name())
|
||||
}
|
||||
|
||||
type mockfs struct {
|
||||
localFS
|
||||
|
||||
createWrapper func(file) file
|
||||
}
|
||||
|
||||
func (m mockfs) CreateTemp(dir, pattern string) (file, error) {
|
||||
f, err := m.localFS.CreateTemp(dir, pattern)
|
||||
|
||||
if m.createWrapper != nil {
|
||||
f = m.createWrapper(f)
|
||||
}
|
||||
|
||||
return f, err
|
||||
}
|
||||
|
||||
type mockFileWriteError struct {
|
||||
file
|
||||
}
|
||||
|
||||
func (mf mockFileWriteError) Write(p []byte) (n int, err error) {
|
||||
return 0, errors.New("mock file write error")
|
||||
}
|
||||
|
||||
type mockFileSyncError struct {
|
||||
file
|
||||
}
|
||||
|
||||
func (mf mockFileSyncError) Sync() error {
|
||||
return errors.New("mock file sync error")
|
||||
}
|
||||
|
||||
type mockFileCloseError struct {
|
||||
file
|
||||
}
|
||||
|
||||
func (mf mockFileCloseError) Close() error {
|
||||
if err := mf.file.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.New("mock file close error")
|
||||
}
|
||||
|
||||
// TestWriteTempFileAtomic_NoTempFilesLeftOnError verifies that writeTempFileAtomic
|
||||
// does not leak the temporary file after a write, sync, or close error.
|
||||
func TestWriteTempFileAtomic_NoTempFilesLeftOnError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
mockfs
|
||||
description string
|
||||
}{
|
||||
{
|
||||
description: "write-error",
|
||||
mockfs: mockfs{
|
||||
createWrapper: func(f file) file { return mockFileWriteError{file: f} },
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "sync-error",
|
||||
mockfs: mockfs{
|
||||
createWrapper: func(f file) file { return mockFileSyncError{file: f} },
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "close-error",
|
||||
mockfs: mockfs{
|
||||
createWrapper: func(f file) file { return mockFileCloseError{file: f} },
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.description, func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
name, err := writeTempFileAtomicImp(c.mockfs, dir, []byte("data"))
|
||||
require.Error(t, err)
|
||||
require.Empty(t, name)
|
||||
t.Log("error:", err)
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, entries)
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user