mirror of
https://github.com/rclone/rclone.git
synced 2026-05-13 19:04:17 -04:00
hasher: fix Update not storing hashes in bolt DB after file replacement
When rclone sync replaced an existing file, it called Update which pruned the old hash but never computed or stored the new one. This left the file with no hash entry in the bolt DB. This applies the same hashing logic in Put to Update: compute hashes during the transfer via a hashingReader and store them afterwards. The common hash-wrapping and hash-storing logic was extracted into a function. Fixes #9308
This commit is contained in:
@@ -69,12 +69,66 @@ func (f *Fs) testUploadFromCrypt(t *testing.T) {
|
||||
_ = operations.Purge(ctx, f, dirName)
|
||||
}
|
||||
|
||||
func (f *Fs) testUpdateStoresHash(t *testing.T) {
|
||||
// make a temporary local remote
|
||||
tempRoot, err := fstest.LocalRemote()
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
_ = os.RemoveAll(tempRoot)
|
||||
}()
|
||||
|
||||
// make a temporary crypt remote as source
|
||||
ctx := context.Background()
|
||||
pass := obscure.MustObscure("crypt")
|
||||
remote := fmt.Sprintf(`:crypt,remote="%s",password="%s":`, tempRoot, pass)
|
||||
cryptFs, err := fs.NewFs(ctx, remote)
|
||||
require.NoError(t, err)
|
||||
|
||||
const dirName = "update_hash_1"
|
||||
const fileName = dirName + "/file_update_1"
|
||||
const longTime = fs.ModTimeNotSupported
|
||||
hashType := f.keepHashes.GetOne()
|
||||
|
||||
// upload initial file to hasher via Put
|
||||
src1 := putFile(ctx, t, cryptFs, fileName, "initial content")
|
||||
in1, err := src1.Open(ctx)
|
||||
require.NoError(t, err)
|
||||
dst, err := f.Put(ctx, in1, src1)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, dst)
|
||||
|
||||
// verify hash was stored after Put
|
||||
var hash1 string
|
||||
if f.opt.MaxAge > 0 {
|
||||
hash1, err = f.getRawHash(ctx, hashType, fileName, anyFingerprint, longTime)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, hash1)
|
||||
}
|
||||
|
||||
// update the file with new content via Update (this is what sync does for replacements)
|
||||
src2 := putFile(ctx, t, cryptFs, fileName, "updated content")
|
||||
in2, err := src2.Open(ctx)
|
||||
require.NoError(t, err)
|
||||
err = dst.Update(ctx, in2, src2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// verify hash was stored after Update and is different from the original
|
||||
if f.opt.MaxAge > 0 {
|
||||
hash2, err := f.getRawHash(ctx, hashType, fileName, anyFingerprint, longTime)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, hash2, "hash should be stored after Update")
|
||||
assert.NotEqual(t, hash1, hash2, "hash should change after Update with different content")
|
||||
}
|
||||
_ = operations.Purge(ctx, f, dirName)
|
||||
}
|
||||
|
||||
// InternalTest dispatches all internal tests
|
||||
func (f *Fs) InternalTest(t *testing.T) {
|
||||
if !kv.Supported() {
|
||||
t.Skip("hasher is not supported on this OS")
|
||||
}
|
||||
t.Run("UploadFromCrypt", f.testUploadFromCrypt)
|
||||
t.Run("UpdateStoresHash", f.testUpdateStoresHash)
|
||||
}
|
||||
|
||||
var _ fstests.InternalTester = (*Fs)(nil)
|
||||
|
||||
@@ -131,10 +131,62 @@ func (o *Object) updateHashes(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// wrapInHashingReader wraps in with a hashing reader if needed and
|
||||
// returns the (possibly wrapped) reader. After the transfer completes
|
||||
// successfully, call the returned storeHashes function to store the
|
||||
// computed hashes.
|
||||
func (f *Fs) wrapInHashingReader(ctx context.Context, in io.Reader, src fs.ObjectInfo) (wrapIn io.Reader, storeHashes func(o *Object)) {
|
||||
var (
|
||||
common hash.Set
|
||||
rehash bool
|
||||
hashes hashMap
|
||||
)
|
||||
if fsrc := src.Fs(); fsrc != nil {
|
||||
common = fsrc.Hashes().Overlap(f.keepHashes)
|
||||
// Rehash if source does not have all required hashes or hashing is slow
|
||||
rehash = fsrc.Features().SlowHash || common != f.keepHashes
|
||||
}
|
||||
|
||||
wrapIn = in
|
||||
if rehash {
|
||||
r, err := f.newHashingReader(ctx, in, func(sums hashMap) {
|
||||
hashes = sums
|
||||
})
|
||||
fs.Debugf(src, "Rehash in-fly due to incomplete or slow source set %v (err: %v)", common, err)
|
||||
if err == nil {
|
||||
wrapIn = r
|
||||
} else {
|
||||
rehash = false
|
||||
}
|
||||
}
|
||||
|
||||
storeHashes = func(o *Object) {
|
||||
if !rehash {
|
||||
hashes = hashMap{}
|
||||
for _, ht := range common.Array() {
|
||||
if h, e := src.Hash(ctx, ht); e == nil && h != "" {
|
||||
hashes[ht] = h
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(hashes) > 0 {
|
||||
err := o.putHashes(ctx, hashes)
|
||||
fs.Debugf(o, "Applied %d source hashes, err: %v", len(hashes), err)
|
||||
}
|
||||
}
|
||||
return wrapIn, storeHashes
|
||||
}
|
||||
|
||||
// Update the object with the given data, time and size.
|
||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
wrapIn, storeHashes := o.f.wrapInHashingReader(ctx, in, src)
|
||||
_ = o.f.pruneHash(src.Remote())
|
||||
return o.Object.Update(ctx, in, src, options...)
|
||||
err := o.Object.Update(ctx, wrapIn, src, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
storeHashes(o)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove an object.
|
||||
@@ -189,51 +241,15 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (r io.ReadC
|
||||
|
||||
// Put data into the remote path with given modTime and size
|
||||
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
var (
|
||||
o fs.Object
|
||||
common hash.Set
|
||||
rehash bool
|
||||
hashes hashMap
|
||||
)
|
||||
if fsrc := src.Fs(); fsrc != nil {
|
||||
common = fsrc.Hashes().Overlap(f.keepHashes)
|
||||
// Rehash if source does not have all required hashes or hashing is slow
|
||||
rehash = fsrc.Features().SlowHash || common != f.keepHashes
|
||||
}
|
||||
|
||||
wrapIn := in
|
||||
if rehash {
|
||||
r, err := f.newHashingReader(ctx, in, func(sums hashMap) {
|
||||
hashes = sums
|
||||
})
|
||||
fs.Debugf(src, "Rehash in-fly due to incomplete or slow source set %v (err: %v)", common, err)
|
||||
if err == nil {
|
||||
wrapIn = r
|
||||
} else {
|
||||
rehash = false
|
||||
}
|
||||
}
|
||||
|
||||
wrapIn, storeHashes := f.wrapInHashingReader(ctx, in, src)
|
||||
_ = f.pruneHash(src.Remote())
|
||||
oResult, err := f.Fs.Put(ctx, wrapIn, src, options...)
|
||||
o, err = f.wrapObject(oResult, err)
|
||||
o, err := f.wrapObject(oResult, err)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !rehash {
|
||||
hashes = hashMap{}
|
||||
for _, ht := range common.Array() {
|
||||
if h, e := src.Hash(ctx, ht); e == nil && h != "" {
|
||||
hashes[ht] = h
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(hashes) > 0 {
|
||||
err := o.(*Object).putHashes(ctx, hashes)
|
||||
fs.Debugf(o, "Applied %d source hashes, err: %v", len(hashes), err)
|
||||
}
|
||||
return o, err
|
||||
storeHashes(o.(*Object))
|
||||
return o, nil
|
||||
}
|
||||
|
||||
type hashingReader struct {
|
||||
|
||||
Reference in New Issue
Block a user