From 8ec295eddd2f61ec0a4bd628d6b5fa2f40f031ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julio=20L=C3=B3pez?= <1953782+julio-lopez@users.noreply.github.com> Date: Tue, 10 Feb 2026 21:07:48 -0800 Subject: [PATCH] fix(providers): sync file in FS provider to ensure data is persisted (#5150) Ensures the data is written to persistent storage before closing the file. - This prevents silently ignoring async I/O error that occur after the file handle has been closed, and thus not observed by the kopia process. - This prevents data loss cases of a client or server host crash where the file metadata was already persisted and the actual file data was not persisted yet. The actual behavior depends on the specific OS and file system implementation. For example, if no data has been written (flushed) to the device, the file size (metadata) may be 0, even after the subsequent rename. Either case above results in a potential repo corruption or data loss. These cases may occur more frequently with file systems accessed over the network, for example via NFS. Behavior change: a `Sync()` failure now aborts the write before close/ rename, so callers will see errors where previously the operation might have appeared successful, but risked silent data loss. --- repo/blob/filesystem/filesystem_storage.go | 4 + .../filesystem_storage_sync_test.go | 166 ++++++++++++++++++ repo/blob/filesystem/osinterface.go | 2 + repo/blob/filesystem/osinterface_mock_test.go | 21 ++- 4 files changed, 189 insertions(+), 4 deletions(-) create mode 100644 repo/blob/filesystem/filesystem_storage_sync_test.go diff --git a/repo/blob/filesystem/filesystem_storage.go b/repo/blob/filesystem/filesystem_storage.go index 4e9a1e852..98acb8c3c 100644 --- a/repo/blob/filesystem/filesystem_storage.go +++ b/repo/blob/filesystem/filesystem_storage.go @@ -182,6 +182,10 @@ func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data return errors.Wrap(err, "can't write temporary file") } + if err = f.Sync(); err != nil { + return errors.Wrap(err, "can't sync temporary file data") + } + if err = f.Close(); err != nil { return errors.Wrap(err, "can't close temporary file") } diff --git a/repo/blob/filesystem/filesystem_storage_sync_test.go b/repo/blob/filesystem/filesystem_storage_sync_test.go new file mode 100644 index 000000000..548f657fc --- /dev/null +++ b/repo/blob/filesystem/filesystem_storage_sync_test.go @@ -0,0 +1,166 @@ +package filesystem + +import ( + "os" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/sharded" +) + +type verifySyncBeforeCloseFile struct { + osWriteFile // +checklocksignore set on instantiation + + notifyClose func() // +checklocksignore set on instantiation + notifyDirtyClose func() // +checklocksignore set on instantiation + + mu sync.Mutex + // +checklocks:mu + dirty bool +} + +func (vf *verifySyncBeforeCloseFile) Write(p []byte) (n int, err error) { + vf.mu.Lock() + defer vf.mu.Unlock() + + vf.dirty = true + + return vf.osWriteFile.Write(p) +} + +func (vf *verifySyncBeforeCloseFile) Sync() error { + vf.mu.Lock() + defer vf.mu.Unlock() + + err := vf.osWriteFile.Sync() + if err == nil { + vf.dirty = false + } + + return err +} + +func (vf *verifySyncBeforeCloseFile) Close() error { + vf.mu.Lock() + defer vf.mu.Unlock() + + err := vf.osWriteFile.Close() + + if vf.dirty { + vf.notifyDirtyClose() + } + + vf.notifyClose() + + return err +} + +type mockOSForSyncTest struct { + mockOS + + fileOpenCount atomic.Uint32 + fileCloseCount atomic.Uint32 + dirtyClose atomic.Bool +} + +func (osi *mockOSForSyncTest) Open(fname string) (osReadFile, error) { + f, err := osi.mockOS.Open(fname) + if err != nil { + return nil, err + } + + osi.fileOpenCount.Add(1) + + return f, nil +} + +func (osi *mockOSForSyncTest) CreateNewFile(fname string, perm os.FileMode) (osWriteFile, error) { + wf, err := osi.mockOS.CreateNewFile(fname, perm) + if err != nil { + return nil, err + } + + osi.fileOpenCount.Add(1) + + return &verifySyncBeforeCloseFile{ + osWriteFile: wf, + notifyClose: func() { osi.fileCloseCount.Add(1) }, + notifyDirtyClose: func() { osi.dirtyClose.Store(true) }, + }, nil +} + +// These tests reuse the retry/error-count mock to assert sync handling in PutBlob. +func TestPutBlob_SyncBeforeClose(t *testing.T) { + t.Parallel() + + ctx := testlogging.Context(t) + osi := &mockOSForSyncTest{ + mockOS: mockOS{ + osInterface: realOS{}, + }, + } + + st, err := New(ctx, &Options{ + Path: testutil.TempDirectory(t), + Options: sharded.Options{DirectoryShards: []int{1}}, + + osInterfaceOverride: osi, + }, true) + require.NoError(t, err) + + t.Cleanup(func() { _ = st.Close(ctx) }) + + err = st.PutBlob(ctx, "blob-sync-ok", gather.FromSlice([]byte("hello")), blob.PutOptions{}) + + require.False(t, osi.dirtyClose.Load(), "close called without calling sync after a write") + require.Equal(t, osi.fileOpenCount.Load(), osi.fileCloseCount.Load(), "calls to file.Close() must match number of opened files()") + require.NoError(t, err) + + var buf gather.WriteBuffer + t.Cleanup(buf.Close) + + err = st.GetBlob(ctx, "blob-sync-ok", 0, -1, &buf) + require.NoError(t, err) + require.Equal(t, []byte("hello"), buf.ToByteSlice()) +} + +func TestPutBlob_FailsOnSyncError(t *testing.T) { + t.Parallel() + + ctx := testlogging.Context(t) + dataDir := testutil.TempDirectory(t) + + osi := newMockOS() + + st, err := New(ctx, &Options{ + Path: dataDir, + Options: sharded.Options{DirectoryShards: []int{1}}, + + osInterfaceOverride: osi, + }, true) + require.NoError(t, err) + t.Cleanup(func() { _ = st.Close(ctx) }) + + // Test HACK: write a dummy blob to force writing the sharding configuration file, so writing the + // config file does not interfere with the test. While this is coupled to the specifics of the + // current implementation, it is required to be able to test the failure case. + err = st.PutBlob(ctx, "dummy", gather.FromSlice([]byte("hello")), blob.PutOptions{}) + require.NoError(t, err) + + // Inject a failure per create (re-)try, 10 is the default number of retries + osi.writeFileSyncRemainingErrors.Store(10) + + err = st.PutBlob(ctx, "blob-sync-fail", gather.FromSlice([]byte("hello")), blob.PutOptions{}) + require.Error(t, err) + require.ErrorContains(t, err, "can't sync temporary file data") + + _, err = st.GetMetadata(ctx, "blob-sync-fail") + require.ErrorIs(t, err, blob.ErrBlobNotFound) +} diff --git a/repo/blob/filesystem/osinterface.go b/repo/blob/filesystem/osinterface.go index c0fb25682..49e5f00a7 100644 --- a/repo/blob/filesystem/osinterface.go +++ b/repo/blob/filesystem/osinterface.go @@ -38,4 +38,6 @@ type osReadFile interface { type osWriteFile interface { io.WriteCloser + + Sync() error } diff --git a/repo/blob/filesystem/osinterface_mock_test.go b/repo/blob/filesystem/osinterface_mock_test.go index fc1a9d229..43070cbd6 100644 --- a/repo/blob/filesystem/osinterface_mock_test.go +++ b/repo/blob/filesystem/osinterface_mock_test.go @@ -13,9 +13,12 @@ var errNonRetriable = errors.New("some non-retriable error") type mockOS struct { + osInterface + readFileRemainingErrors atomic.Int32 writeFileRemainingErrors atomic.Int32 writeFileCloseRemainingErrors atomic.Int32 + writeFileSyncRemainingErrors atomic.Int32 createNewFileRemainingErrors atomic.Int32 mkdirAllRemainingErrors atomic.Int32 renameRemainingErrors atomic.Int32 @@ -34,8 +37,6 @@ type mockOS struct { // remaining syscall errnos //nolint:unused // Used with platform specific code eStaleRemainingErrors atomic.Int32 - - osInterface } func (osi *mockOS) Open(fname string) (osReadFile, error) { @@ -125,11 +126,15 @@ func (osi *mockOS) CreateNewFile(fname string, perm os.FileMode) (osWriteFile, e } if osi.writeFileRemainingErrors.Add(-1) >= 0 { - return writeFailureFile{wf}, nil + wf = writeFailureFile{wf} + } + + if osi.writeFileSyncRemainingErrors.Add(-1) >= 0 { + wf = syncFailureFile{wf} } if osi.writeFileCloseRemainingErrors.Add(-1) >= 0 { - return writeCloseFailureFile{wf}, nil + wf = writeCloseFailureFile{wf} } return wf, nil @@ -163,6 +168,14 @@ func (f writeFailureFile) Write(b []byte) (int, error) { return 0, &os.PathError{Op: "write", Err: errors.New("underlying problem")} } +type syncFailureFile struct { + osWriteFile +} + +func (f syncFailureFile) Sync() error { + return &os.PathError{Op: "fsync", Err: errors.New("sync failure")} +} + type writeCloseFailureFile struct { osWriteFile }