mirror of
https://github.com/rclone/rclone.git
synced 2026-07-01 03:15:06 -04:00
mega: wait for server events after upload, delete and move
The mega backend keeps the whole account as an in-memory tree which go-mega reconciles asynchronously from the server's event stream. After an upload, delete or move the optimistic local update could be undone moments later when go-mega replayed the corresponding server event, re-adding a node to its parent. In a long-running process such as mount or serve this could briefly show a just-deleted file in a listing, or make a moved file reappear shortly afterwards. Wait for the server to confirm uploads, deletes and moves so the in-memory tree is settled before returning, matching what Rmdir and Purge already do. The move wait was also previously started after the server-side move so only the rename was covered. Add an internal test reproducing the lingering listing via a tight put/remove/list loop.
This commit is contained in:
@@ -770,6 +770,8 @@ func (f *Fs) move(ctx context.Context, dstRemote string, srcFs *Fs, srcRemote st
|
||||
return fmt.Errorf("server-side move failed to lookup src parent dir: %w", err)
|
||||
}
|
||||
|
||||
waitEvent := f.srv.WaitEventsStart()
|
||||
|
||||
// move the object into its new directory if required
|
||||
if srcDirNode != dstDirNode && srcDirNode.GetHash() != dstDirNode.GetHash() {
|
||||
//log.Printf("move src %p %q dst %p %q", srcDirNode, srcDirNode.GetName(), dstDirNode, dstDirNode.GetName())
|
||||
@@ -782,8 +784,6 @@ func (f *Fs) move(ctx context.Context, dstRemote string, srcFs *Fs, srcRemote st
|
||||
}
|
||||
}
|
||||
|
||||
waitEvent := f.srv.WaitEventsStart()
|
||||
|
||||
// rename the object if required
|
||||
if srcLeaf != dstLeaf {
|
||||
//log.Printf("rename %q to %q", srcLeaf, dstLeaf)
|
||||
@@ -1216,6 +1216,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
}
|
||||
}
|
||||
|
||||
waitEvent := o.fs.srv.WaitEventsStart()
|
||||
|
||||
// Finish the upload
|
||||
var info *mega.Node
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
@@ -1235,15 +1237,23 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
o.info = nil
|
||||
}
|
||||
|
||||
// Wait for the server to confirm the new file
|
||||
o.fs.srv.WaitEvents(waitEvent, eventWaitTime)
|
||||
|
||||
return o.setMetaData(info)
|
||||
}
|
||||
|
||||
// Remove an object
|
||||
func (o *Object) Remove(ctx context.Context) error {
|
||||
waitEvent := o.fs.srv.WaitEventsStart()
|
||||
|
||||
err := o.fs.deleteNode(ctx, o.info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Remove object failed: %w", err)
|
||||
}
|
||||
|
||||
// Wait for the server to confirm the deletion
|
||||
o.fs.srv.WaitEvents(waitEvent, eventWaitTime)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
63
backend/mega/mega_internal_test.go
Normal file
63
backend/mega/mega_internal_test.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package mega
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
"github.com/rclone/rclone/fstest/fstests"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// InternalTestGhostAfterRemove checks that a file removed in a long-running
|
||||
// process disappears from listings straight away rather than lingering as a
|
||||
// ghost until go-mega's next event poll reconciles the in-memory tree.
|
||||
//
|
||||
// The ghost is a race between the upload's server event being replayed and
|
||||
// the delete, so it puts, removes and lists in a tight loop to make it show
|
||||
// up reliably.
|
||||
func (f *Fs) InternalTestGhostAfterRemove(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Exercise the default soft-delete path - the hard_delete=true path
|
||||
// hits a separate bug in go-mega's Delete which leaves the node in its
|
||||
// parent's children regardless of what rclone does.
|
||||
if f.opt.HardDelete {
|
||||
oldHardDelete := f.opt.HardDelete
|
||||
f.opt.HardDelete = false
|
||||
defer func() { f.opt.HardDelete = oldHardDelete }()
|
||||
}
|
||||
|
||||
dir := "ghost-test"
|
||||
require.NoError(t, f.Mkdir(ctx, dir))
|
||||
defer func() { _ = f.Rmdir(ctx, dir) }()
|
||||
|
||||
contents := []byte("ghost test contents")
|
||||
for i := range 10 {
|
||||
remote := dir + "/test.txt"
|
||||
src := object.NewStaticObjectInfo(remote, time.Now(), int64(len(contents)), true, nil, nil)
|
||||
obj, err := f.Put(ctx, bytes.NewReader(contents), src)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, obj.Remove(ctx))
|
||||
|
||||
// The file must be gone from the listing straight away - if the
|
||||
// in-memory tree isn't settled it lingers as a ghost.
|
||||
entries, err := f.List(ctx, dir)
|
||||
require.NoError(t, err)
|
||||
names := make([]string, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
names = append(names, entry.Remote())
|
||||
}
|
||||
assert.NotContains(t, names, remote, "file should be gone immediately after remove (iteration %d)", i)
|
||||
}
|
||||
}
|
||||
|
||||
// InternalTest dispatches the backend specific internal tests
|
||||
func (f *Fs) InternalTest(t *testing.T) {
|
||||
t.Run("GhostAfterRemove", f.InternalTestGhostAfterRemove)
|
||||
}
|
||||
|
||||
var _ fstests.InternalTester = (*Fs)(nil)
|
||||
Reference in New Issue
Block a user