mirror of
https://github.com/kopia/kopia.git
synced 2026-03-25 09:31:12 -04:00
fix(providers): cleanup temporary file on put blob failure (#5157)
Ensure that temporary files are removed when there is an error writing the file. Refactor PutBlobInPath to extract temp file creation into separate `createTempFileWithData` function that handles: - Temporary file creation with random suffix - Writing data to the file - Syncing the file - Closing the file - Removing temp file on any write/sync/close error Added unit tests for error handling scenarios
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
stderrors "errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@@ -154,7 +155,7 @@ func (fs *fsImpl) GetMetadataFromPath(ctx context.Context, dirPath, path string)
|
||||
}, fs.isRetriable)
|
||||
}
|
||||
|
||||
//nolint:wrapcheck,gocyclo
|
||||
//nolint:wrapcheck
|
||||
func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data blob.Bytes, opts blob.PutOptions) error {
|
||||
_ = dirPath
|
||||
|
||||
@@ -166,28 +167,9 @@ func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data
|
||||
}
|
||||
|
||||
return retry.WithExponentialBackoffNoValue(ctx, "PutBlobInPath:"+path, func() error {
|
||||
randSuffix := make([]byte, tempFileRandomSuffixLen)
|
||||
if _, err := rand.Read(randSuffix); err != nil {
|
||||
return errors.Wrap(err, "can't get random bytes")
|
||||
}
|
||||
|
||||
tempFile := fmt.Sprintf("%s.tmp.%x", path, randSuffix)
|
||||
|
||||
f, err := fs.createTempFileAndDir(tempFile)
|
||||
tempFile, err := fs.createTempFileWithData(path, data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot create temporary file")
|
||||
}
|
||||
|
||||
if _, err = data.WriteTo(f); err != nil {
|
||||
return errors.Wrap(err, "can't write temporary file")
|
||||
}
|
||||
|
||||
if err = f.Sync(); err != nil {
|
||||
return errors.Wrap(err, "can't sync temporary file data")
|
||||
}
|
||||
|
||||
if err = f.Close(); err != nil {
|
||||
return errors.Wrap(err, "can't close temporary file")
|
||||
return err
|
||||
}
|
||||
|
||||
err = fs.osi.Rename(tempFile, path)
|
||||
@@ -225,6 +207,50 @@ func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data
|
||||
}, fs.isRetriable)
|
||||
}
|
||||
|
||||
// createTempFileWithData creates a temporary file, writes data to it, syncs and closes it.
|
||||
// Returns the name of the temporary file and an error.
|
||||
// If there is an error writing, syncing, or closing the file, the temporary file is removed.
|
||||
func (fs *fsImpl) createTempFileWithData(path string, data blob.Bytes) (name string, err error) {
|
||||
randSuffix := make([]byte, tempFileRandomSuffixLen)
|
||||
if _, err := rand.Read(randSuffix); err != nil {
|
||||
return "", errors.Wrap(err, "can't get random bytes for temporary filename")
|
||||
}
|
||||
|
||||
tempFile := fmt.Sprintf("%s.tmp.%x", path, randSuffix)
|
||||
|
||||
f, err := fs.createTempFileAndDir(tempFile)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "cannot create temporary file")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if closeErr := f.Close(); closeErr != nil {
|
||||
err = stderrors.Join(err, errors.Wrap(closeErr, "can't close temporary file"))
|
||||
}
|
||||
|
||||
// remove temp file when any of the operations fail
|
||||
if err != nil {
|
||||
name = ""
|
||||
|
||||
if removeErr := fs.osi.Remove(tempFile); removeErr != nil {
|
||||
err = stderrors.Join(err, errors.Wrap(removeErr, "can't remove temp file after error"))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err = data.WriteTo(f); err != nil {
|
||||
return "", errors.Wrap(err, "can't write temporary file")
|
||||
}
|
||||
|
||||
if err = f.Sync(); err != nil {
|
||||
return "", errors.Wrap(err, "can't sync temporary file data")
|
||||
}
|
||||
|
||||
// f closed in deferred cleanup function
|
||||
|
||||
return tempFile, nil
|
||||
}
|
||||
|
||||
func (fs *fsImpl) createTempFileAndDir(tempFile string) (osWriteFile, error) {
|
||||
f, err := fs.osi.CreateNewFile(tempFile, fs.fileMode())
|
||||
if fs.osi.IsNotExist(err) {
|
||||
|
||||
@@ -2,8 +2,10 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -496,3 +498,155 @@ func newMockOS() *mockOS {
|
||||
osInterface: realOS{},
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileStorage_CreateTempFileWithData_Success(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
dataDir := testutil.TempDirectory(t)
|
||||
|
||||
st, err := New(ctx, &Options{
|
||||
Path: dataDir,
|
||||
Options: sharded.Options{
|
||||
DirectoryShards: []int{5, 2},
|
||||
},
|
||||
}, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, st.Close(ctx))
|
||||
})
|
||||
|
||||
data := gather.FromSlice([]byte{1, 2, 3, 4, 5})
|
||||
testPath := filepath.Join(dataDir, "someb", "lo", "b1234567812345678.f")
|
||||
tempFile, err := asFsImpl(t, st).createTempFileWithData(testPath, data)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, tempFile)
|
||||
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, os.Remove(tempFile))
|
||||
})
|
||||
|
||||
require.Contains(t, tempFile, ".tmp.")
|
||||
|
||||
// Verify temp file exists and has correct content
|
||||
content, err := os.ReadFile(tempFile)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte{1, 2, 3, 4, 5}, content)
|
||||
}
|
||||
|
||||
func TestFileStorage_CreateTempFileWithData_WriteError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
dataDir := testutil.TempDirectory(t)
|
||||
|
||||
osi := newMockOS()
|
||||
osi.writeFileRemainingErrors.Store(1)
|
||||
|
||||
st, err := New(ctx, &Options{
|
||||
Path: dataDir,
|
||||
Options: sharded.Options{
|
||||
DirectoryShards: []int{5, 2},
|
||||
},
|
||||
osInterfaceOverride: osi,
|
||||
}, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, st.Close(ctx))
|
||||
})
|
||||
|
||||
data := gather.FromSlice([]byte{1, 2, 3, 4, 5})
|
||||
testPath := filepath.Join(dataDir, "someb", "lo", "b1234567812345678.f")
|
||||
tempFile, err := asFsImpl(t, st).createTempFileWithData(testPath, data)
|
||||
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "can't write temporary file")
|
||||
require.Empty(t, tempFile)
|
||||
|
||||
// Verify temp file was removed (doesn't exist). There should be no other
|
||||
// blobs with the same prefix, so listing blobs should return 0 entries.
|
||||
verifyEmptyDir(t, filepath.Join(dataDir, "someb", "lo"))
|
||||
}
|
||||
|
||||
func TestFileStorage_CreateTempFileWithData_SyncError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
dataDir := testutil.TempDirectory(t)
|
||||
|
||||
osi := newMockOS()
|
||||
osi.writeFileSyncRemainingErrors.Store(1)
|
||||
|
||||
st, err := New(ctx, &Options{
|
||||
Path: dataDir,
|
||||
Options: sharded.Options{
|
||||
DirectoryShards: []int{5, 2},
|
||||
},
|
||||
osInterfaceOverride: osi,
|
||||
}, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, st.Close(ctx))
|
||||
})
|
||||
|
||||
data := gather.FromSlice([]byte{1, 2, 3, 4, 5})
|
||||
testPath := filepath.Join(dataDir, "someb", "lo", "b1234567812345678.f")
|
||||
tempFile, err := asFsImpl(t, st).createTempFileWithData(testPath, data)
|
||||
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "can't sync temporary file data")
|
||||
require.Empty(t, tempFile)
|
||||
|
||||
verifyEmptyDir(t, filepath.Join(dataDir, "someb", "lo"))
|
||||
}
|
||||
|
||||
func TestFileStorage_CreateTempFileWithData_CloseError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
dataDir := testutil.TempDirectory(t)
|
||||
|
||||
osi := newMockOS()
|
||||
osi.writeFileCloseRemainingErrors.Store(1)
|
||||
|
||||
st, err := New(ctx, &Options{
|
||||
Path: dataDir,
|
||||
Options: sharded.Options{
|
||||
DirectoryShards: []int{5, 2},
|
||||
},
|
||||
osInterfaceOverride: osi,
|
||||
}, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, st.Close(ctx))
|
||||
})
|
||||
|
||||
data := gather.FromSlice([]byte{1, 2, 3, 4, 5})
|
||||
testPath := filepath.Join(dataDir, "someb", "lo", "b1234567812345678.f")
|
||||
tempFile, err := asFsImpl(t, st).createTempFileWithData(testPath, data)
|
||||
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "can't close temporary file")
|
||||
require.Empty(t, tempFile)
|
||||
|
||||
// Skip this check on Windows because the file cannot be removed because it
|
||||
// is still open, since there was an error closing it.
|
||||
if runtime.GOOS != "windows" {
|
||||
verifyEmptyDir(t, filepath.Join(dataDir, "someb", "lo"))
|
||||
}
|
||||
}
|
||||
|
||||
func verifyEmptyDir(t *testing.T, dir string) {
|
||||
t.Helper()
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, entries)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user