diff --git a/repo/blob/filesystem/filesystem_storage.go b/repo/blob/filesystem/filesystem_storage.go index 4e9a1e852..98acb8c3c 100644 --- a/repo/blob/filesystem/filesystem_storage.go +++ b/repo/blob/filesystem/filesystem_storage.go @@ -182,6 +182,10 @@ func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data return errors.Wrap(err, "can't write temporary file") } + if err = f.Sync(); err != nil { + return errors.Wrap(err, "can't sync temporary file data") + } + if err = f.Close(); err != nil { return errors.Wrap(err, "can't close temporary file") } diff --git a/repo/blob/filesystem/filesystem_storage_sync_test.go b/repo/blob/filesystem/filesystem_storage_sync_test.go new file mode 100644 index 000000000..548f657fc --- /dev/null +++ b/repo/blob/filesystem/filesystem_storage_sync_test.go @@ -0,0 +1,166 @@ +package filesystem + +import ( + "os" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/sharded" +) + +type verifySyncBeforeCloseFile struct { + osWriteFile // +checklocksignore set on instantiation + + notifyClose func() // +checklocksignore set on instantiation + notifyDirtyClose func() // +checklocksignore set on instantiation + + mu sync.Mutex + // +checklocks:mu + dirty bool +} + +func (vf *verifySyncBeforeCloseFile) Write(p []byte) (n int, err error) { + vf.mu.Lock() + defer vf.mu.Unlock() + + vf.dirty = true + + return vf.osWriteFile.Write(p) +} + +func (vf *verifySyncBeforeCloseFile) Sync() error { + vf.mu.Lock() + defer vf.mu.Unlock() + + err := vf.osWriteFile.Sync() + if err == nil { + vf.dirty = false + } + + return err +} + +func (vf *verifySyncBeforeCloseFile) Close() error { + vf.mu.Lock() + defer vf.mu.Unlock() + + err := vf.osWriteFile.Close() + + if vf.dirty { + vf.notifyDirtyClose() + } + + vf.notifyClose() + + return err +} + +type mockOSForSyncTest struct { + mockOS + + fileOpenCount atomic.Uint32 + fileCloseCount atomic.Uint32 + dirtyClose atomic.Bool +} + +func (osi *mockOSForSyncTest) Open(fname string) (osReadFile, error) { + f, err := osi.mockOS.Open(fname) + if err != nil { + return nil, err + } + + osi.fileOpenCount.Add(1) + + return f, nil +} + +func (osi *mockOSForSyncTest) CreateNewFile(fname string, perm os.FileMode) (osWriteFile, error) { + wf, err := osi.mockOS.CreateNewFile(fname, perm) + if err != nil { + return nil, err + } + + osi.fileOpenCount.Add(1) + + return &verifySyncBeforeCloseFile{ + osWriteFile: wf, + notifyClose: func() { osi.fileCloseCount.Add(1) }, + notifyDirtyClose: func() { osi.dirtyClose.Store(true) }, + }, nil +} + +// These tests reuse the retry/error-count mock to assert sync handling in PutBlob. +func TestPutBlob_SyncBeforeClose(t *testing.T) { + t.Parallel() + + ctx := testlogging.Context(t) + osi := &mockOSForSyncTest{ + mockOS: mockOS{ + osInterface: realOS{}, + }, + } + + st, err := New(ctx, &Options{ + Path: testutil.TempDirectory(t), + Options: sharded.Options{DirectoryShards: []int{1}}, + + osInterfaceOverride: osi, + }, true) + require.NoError(t, err) + + t.Cleanup(func() { _ = st.Close(ctx) }) + + err = st.PutBlob(ctx, "blob-sync-ok", gather.FromSlice([]byte("hello")), blob.PutOptions{}) + + require.False(t, osi.dirtyClose.Load(), "close called without calling sync after a write") + require.Equal(t, osi.fileOpenCount.Load(), osi.fileCloseCount.Load(), "calls to file.Close() must match number of opened files()") + require.NoError(t, err) + + var buf gather.WriteBuffer + t.Cleanup(buf.Close) + + err = st.GetBlob(ctx, "blob-sync-ok", 0, -1, &buf) + require.NoError(t, err) + require.Equal(t, []byte("hello"), buf.ToByteSlice()) +} + +func TestPutBlob_FailsOnSyncError(t *testing.T) { + t.Parallel() + + ctx := testlogging.Context(t) + dataDir := testutil.TempDirectory(t) + + osi := newMockOS() + + st, err := New(ctx, &Options{ + Path: dataDir, + Options: sharded.Options{DirectoryShards: []int{1}}, + + osInterfaceOverride: osi, + }, true) + require.NoError(t, err) + t.Cleanup(func() { _ = st.Close(ctx) }) + + // Test HACK: write a dummy blob to force writing the sharding configuration file, so writing the + // config file does not interfere with the test. While this is coupled to the specifics of the + // current implementation, it is required to be able to test the failure case. + err = st.PutBlob(ctx, "dummy", gather.FromSlice([]byte("hello")), blob.PutOptions{}) + require.NoError(t, err) + + // Inject a failure per create (re-)try, 10 is the default number of retries + osi.writeFileSyncRemainingErrors.Store(10) + + err = st.PutBlob(ctx, "blob-sync-fail", gather.FromSlice([]byte("hello")), blob.PutOptions{}) + require.Error(t, err) + require.ErrorContains(t, err, "can't sync temporary file data") + + _, err = st.GetMetadata(ctx, "blob-sync-fail") + require.ErrorIs(t, err, blob.ErrBlobNotFound) +} diff --git a/repo/blob/filesystem/osinterface.go b/repo/blob/filesystem/osinterface.go index c0fb25682..49e5f00a7 100644 --- a/repo/blob/filesystem/osinterface.go +++ b/repo/blob/filesystem/osinterface.go @@ -38,4 +38,6 @@ type osReadFile interface { type osWriteFile interface { io.WriteCloser + + Sync() error } diff --git a/repo/blob/filesystem/osinterface_mock_test.go b/repo/blob/filesystem/osinterface_mock_test.go index fc1a9d229..43070cbd6 100644 --- a/repo/blob/filesystem/osinterface_mock_test.go +++ b/repo/blob/filesystem/osinterface_mock_test.go @@ -13,9 +13,12 @@ var errNonRetriable = errors.New("some non-retriable error") type mockOS struct { + osInterface + readFileRemainingErrors atomic.Int32 writeFileRemainingErrors atomic.Int32 writeFileCloseRemainingErrors atomic.Int32 + writeFileSyncRemainingErrors atomic.Int32 createNewFileRemainingErrors atomic.Int32 mkdirAllRemainingErrors atomic.Int32 renameRemainingErrors atomic.Int32 @@ -34,8 +37,6 @@ type mockOS struct { // remaining syscall errnos //nolint:unused // Used with platform specific code eStaleRemainingErrors atomic.Int32 - - osInterface } func (osi *mockOS) Open(fname string) (osReadFile, error) { @@ -125,11 +126,15 @@ func (osi *mockOS) CreateNewFile(fname string, perm os.FileMode) (osWriteFile, e } if osi.writeFileRemainingErrors.Add(-1) >= 0 { - return writeFailureFile{wf}, nil + wf = writeFailureFile{wf} + } + + if osi.writeFileSyncRemainingErrors.Add(-1) >= 0 { + wf = syncFailureFile{wf} } if osi.writeFileCloseRemainingErrors.Add(-1) >= 0 { - return writeCloseFailureFile{wf}, nil + wf = writeCloseFailureFile{wf} } return wf, nil @@ -163,6 +168,14 @@ func (f writeFailureFile) Write(b []byte) (int, error) { return 0, &os.PathError{Op: "write", Err: errors.New("underlying problem")} } +type syncFailureFile struct { + osWriteFile +} + +func (f syncFailureFile) Sync() error { + return &os.PathError{Op: "fsync", Err: errors.New("sync failure")} +} + type writeCloseFailureFile struct { osWriteFile }