mirror of
https://github.com/rclone/rclone.git
synced 2026-05-12 10:03:35 -04:00
accounting: fix rcat/copyurl for files.com
The files.com integration tests for rcat/copyurl were failing because
fs/account.Account was declaring a ReadAt method when the underlying
handle did not support it. The files.com SDK decided to use the ReadAt
method to speed transfers up which failed.
ReadAt and Seek methods were added in this commit to support the
archive command:
409dc75328 accounting: add io.Seeker/io.ReaderAt support to accounting.Account
This fixes the problem by adding new methods to the Account object
WithSeeker/WithReaderAt/WithReadAtSeeker which produce an object with
the desired methods or errors if it isn't possible.
This stops Account advertising things it can't do which is bad Go
practice.
This commit is contained in:
@@ -131,7 +131,11 @@ func ArchiveExtract(ctx context.Context, dst fs.Fs, dstDir string, src fs.Fs, sr
|
||||
}
|
||||
// account and buffer the transfer
|
||||
// in = tr.Account(ctx, in).WithBuffer()
|
||||
in := tr.Account(ctx, in0)
|
||||
acc := tr.Account(ctx, in0)
|
||||
in, err := acc.WithReadAtSeeker()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// identify format
|
||||
format, _, err := archives.Identify(ctx, "", in)
|
||||
if err != nil {
|
||||
|
||||
@@ -183,7 +183,12 @@ func newFile(ctx context.Context, obj fs.Object, fi stdfs.FileInfo) (stdfs.File,
|
||||
return nil, f.err
|
||||
}
|
||||
// Account the transfer
|
||||
f.reader = f.transfer.Account(ctx, f.reader)
|
||||
acc := f.transfer.Account(ctx, f.reader)
|
||||
h, err := acc.WithReadAtSeeker()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.reader = h
|
||||
|
||||
return f, f.err
|
||||
}
|
||||
|
||||
@@ -167,7 +167,11 @@ func ArchiveList(ctx context.Context, src fs.Fs, srcFile string, listFn archives
|
||||
}
|
||||
// account and buffer the transfer
|
||||
// in = tr.Account(ctx, in).WithBuffer()
|
||||
in := tr.Account(ctx, in0)
|
||||
acc := tr.Account(ctx, in0)
|
||||
in, err := acc.WithReadAtSeeker()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// identify format
|
||||
format, _, err := archives.Identify(ctx, "", in)
|
||||
|
||||
|
||||
@@ -402,32 +402,55 @@ func (acc *Account) Read(p []byte) (n int, err error) {
|
||||
return acc.read(acc.in, p)
|
||||
}
|
||||
|
||||
// AccountSeeker is an Account with a Seek method.
|
||||
type AccountSeeker struct {
|
||||
*Account
|
||||
do io.Seeker
|
||||
}
|
||||
|
||||
var _ io.ReadSeeker = AccountSeeker{}
|
||||
|
||||
// Seek to position in the object - see io.Seeker
|
||||
//
|
||||
// May return an error if not implemented by the underlying reader.
|
||||
func (acc *Account) Seek(offset int64, whence int) (int64, error) {
|
||||
func (acc AccountSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
acc.mu.Lock()
|
||||
defer acc.mu.Unlock()
|
||||
return acc.do.Seek(offset, whence)
|
||||
}
|
||||
|
||||
// WithSeeker returns an Account with a Seek method
|
||||
//
|
||||
// It returns an error if the underlying reader does not have a Seek method.
|
||||
func (acc *Account) WithSeeker() (AccountSeeker, error) {
|
||||
do, ok := acc.in.(io.Seeker)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("internal error: Seek not implemented for %T", acc.in)
|
||||
return AccountSeeker{}, fmt.Errorf("internal error: Seek not implemented for %T: %w", acc.in, errors.ErrUnsupported)
|
||||
}
|
||||
return do.Seek(offset, whence)
|
||||
return AccountSeeker{
|
||||
Account: acc,
|
||||
do: do,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AccountReaderAt is an Account with a ReadAt method.
|
||||
type AccountReaderAt struct {
|
||||
*Account
|
||||
do io.ReaderAt
|
||||
}
|
||||
|
||||
var (
|
||||
_ io.Reader = AccountReaderAt{}
|
||||
_ io.ReaderAt = AccountReaderAt{}
|
||||
)
|
||||
|
||||
// ReadAt from off into p - see io.ReaderAt
|
||||
//
|
||||
// May return an error if not implemented by the underlying reader.
|
||||
func (acc *Account) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
func (acc AccountReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
acc.mu.Lock()
|
||||
defer acc.mu.Unlock()
|
||||
do, ok := acc.in.(io.ReaderAt)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("internal error: ReadAt not implemented for %T", acc.in)
|
||||
}
|
||||
bytesUntilLimit, err := acc.checkReadBefore()
|
||||
if err == nil {
|
||||
n, err = do.ReadAt(p, off)
|
||||
n, err = acc.do.ReadAt(p, off)
|
||||
acc.accountRead(n)
|
||||
n, err = acc.checkReadAfter(bytesUntilLimit, n, err)
|
||||
}
|
||||
@@ -435,6 +458,53 @@ func (acc *Account) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
|
||||
}
|
||||
|
||||
// WithReaderAt returns an Account with a ReadAt method
|
||||
//
|
||||
// It returns an error if the underlying reader does not have a ReadAt method.
|
||||
func (acc *Account) WithReaderAt() (AccountReaderAt, error) {
|
||||
do, ok := acc.in.(io.ReaderAt)
|
||||
if !ok {
|
||||
return AccountReaderAt{}, fmt.Errorf("internal error: ReadAt not implemented for %T: %w", acc.in, errors.ErrUnsupported)
|
||||
}
|
||||
return AccountReaderAt{
|
||||
Account: acc,
|
||||
do: do,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AccountReadAtSeeker is an Account with both ReadAt and Seek methods
|
||||
type AccountReadAtSeeker struct {
|
||||
AccountReaderAt
|
||||
seeker AccountSeeker
|
||||
}
|
||||
|
||||
var (
|
||||
_ io.Reader = AccountReadAtSeeker{}
|
||||
_ io.ReaderAt = AccountReadAtSeeker{}
|
||||
_ io.Seeker = AccountReadAtSeeker{}
|
||||
)
|
||||
|
||||
// Seek to position in the object - see io.Seeker
|
||||
func (acc AccountReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
return acc.seeker.Seek(offset, whence)
|
||||
}
|
||||
|
||||
// WithReadAtSeeker returns an Account with a ReadAt and Seek method
|
||||
//
|
||||
// It returns an error if the underlying reader does not have the correct methods.
|
||||
func (acc *Account) WithReadAtSeeker() (AccountReadAtSeeker, error) {
|
||||
readerAt, err1 := acc.WithReaderAt()
|
||||
seeker, err2 := acc.WithSeeker()
|
||||
err := errors.Join(err1, err2)
|
||||
if err != nil {
|
||||
return AccountReadAtSeeker{}, err
|
||||
}
|
||||
return AccountReadAtSeeker{
|
||||
AccountReaderAt: readerAt,
|
||||
seeker: seeker,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Thin wrapper for w
|
||||
type accountWriteTo struct {
|
||||
w io.Writer
|
||||
|
||||
@@ -3,6 +3,7 @@ package accounting
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
@@ -384,3 +385,254 @@ func TestShortenName(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test readers for WithSeeker/WithReaderAt tests
|
||||
type testSeeker struct {
|
||||
*bytes.Buffer
|
||||
}
|
||||
|
||||
func newTestSeeker(data []byte) *testSeeker {
|
||||
return &testSeeker{bytes.NewBuffer(data)}
|
||||
}
|
||||
|
||||
func (ts *testSeeker) Close() error { return nil }
|
||||
|
||||
func (ts *testSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
// Simple implementation for testing
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
ts.Buffer = bytes.NewBuffer(ts.Buffer.Bytes()[offset:])
|
||||
return offset, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("seek whence %d not implemented", whence)
|
||||
}
|
||||
}
|
||||
|
||||
type testReadAtSeeker struct {
|
||||
*bytes.Reader
|
||||
}
|
||||
|
||||
func newTestReadAtSeeker(data []byte) *testReadAtSeeker {
|
||||
return &testReadAtSeeker{bytes.NewReader(data)}
|
||||
}
|
||||
|
||||
func (tras *testReadAtSeeker) Close() error { return nil }
|
||||
|
||||
type testReaderAt struct {
|
||||
data []byte
|
||||
pos int
|
||||
}
|
||||
|
||||
func newTestReaderAt(data []byte) *testReaderAt {
|
||||
return &testReaderAt{data: data}
|
||||
}
|
||||
|
||||
func (tra *testReaderAt) Close() error { return nil }
|
||||
|
||||
func (tra *testReaderAt) Read(p []byte) (n int, err error) {
|
||||
if tra.pos >= len(tra.data) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n = copy(p, tra.data[tra.pos:])
|
||||
tra.pos += n
|
||||
if tra.pos >= len(tra.data) {
|
||||
err = io.EOF
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (tra *testReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
if off < 0 || off >= int64(len(tra.data)) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n = copy(p, tra.data[off:])
|
||||
if n < len(p) {
|
||||
err = io.EOF
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
type testNoSeekNoReadAt struct {
|
||||
*bytes.Buffer
|
||||
}
|
||||
|
||||
func newTestNoSeekNoReadAt(data []byte) *testNoSeekNoReadAt {
|
||||
return &testNoSeekNoReadAt{bytes.NewBuffer(data)}
|
||||
}
|
||||
|
||||
func (tns *testNoSeekNoReadAt) Close() error { return nil }
|
||||
|
||||
func TestAccountWithSeeker(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
testData := []byte{1, 2, 3, 4, 5}
|
||||
|
||||
t.Run("Success", func(t *testing.T) {
|
||||
in := newTestSeeker(testData)
|
||||
stats := NewStats(ctx)
|
||||
acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test")
|
||||
|
||||
seeker, err := acc.WithSeeker()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Test that it implements the expected interfaces
|
||||
var _ io.Reader = seeker
|
||||
var _ io.Seeker = seeker
|
||||
|
||||
// Test seeking functionality
|
||||
offset, err := seeker.Seek(2, io.SeekStart)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(2), offset)
|
||||
|
||||
// Test reading after seek
|
||||
buf := make([]byte, 2)
|
||||
n, err := seeker.Read(buf)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, n)
|
||||
assert.Equal(t, []byte{3, 4}, buf)
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
})
|
||||
|
||||
t.Run("NoSeeker", func(t *testing.T) {
|
||||
in := newTestNoSeekNoReadAt(testData)
|
||||
stats := NewStats(ctx)
|
||||
acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test")
|
||||
|
||||
_, err := acc.WithSeeker()
|
||||
assert.Error(t, err)
|
||||
assert.ErrorIs(t, err, errors.ErrUnsupported)
|
||||
assert.Contains(t, err.Error(), "Seek not implemented for")
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
})
|
||||
}
|
||||
|
||||
func TestAccountWithReaderAt(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
testData := []byte{1, 2, 3, 4, 5}
|
||||
|
||||
t.Run("Success", func(t *testing.T) {
|
||||
in := newTestReaderAt(testData)
|
||||
stats := NewStats(ctx)
|
||||
acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test")
|
||||
|
||||
readerAt, err := acc.WithReaderAt()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Test that it implements the expected interfaces
|
||||
var _ io.Reader = readerAt
|
||||
var _ io.ReaderAt = readerAt
|
||||
|
||||
// Test ReadAt functionality
|
||||
buf := make([]byte, 2)
|
||||
n, err := readerAt.ReadAt(buf, 2)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, n)
|
||||
assert.Equal(t, []byte{3, 4}, buf)
|
||||
|
||||
// Test that accounting is updated
|
||||
acc.values.mu.Lock()
|
||||
bytes := acc.values.bytes
|
||||
acc.values.mu.Unlock()
|
||||
assert.Equal(t, int64(2), bytes)
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
})
|
||||
|
||||
t.Run("NoReaderAt", func(t *testing.T) {
|
||||
in := newTestNoSeekNoReadAt(testData)
|
||||
stats := NewStats(ctx)
|
||||
acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test")
|
||||
|
||||
_, err := acc.WithReaderAt()
|
||||
assert.Error(t, err)
|
||||
assert.ErrorIs(t, err, errors.ErrUnsupported)
|
||||
assert.Contains(t, err.Error(), "ReadAt not implemented for")
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
})
|
||||
}
|
||||
|
||||
func TestAccountWithReadAtSeeker(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
testData := []byte{1, 2, 3, 4, 5}
|
||||
|
||||
t.Run("Success", func(t *testing.T) {
|
||||
// Use testReadAtSeeker which implements ReadAt, Seek, and Close
|
||||
in := newTestReadAtSeeker(testData)
|
||||
stats := NewStats(ctx)
|
||||
acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test")
|
||||
|
||||
readAtSeeker, err := acc.WithReadAtSeeker()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Test that it implements the expected interfaces
|
||||
var _ io.Reader = readAtSeeker
|
||||
var _ io.ReaderAt = readAtSeeker
|
||||
var _ io.Seeker = readAtSeeker
|
||||
|
||||
// Test ReadAt functionality
|
||||
buf := make([]byte, 2)
|
||||
n, err := readAtSeeker.ReadAt(buf, 2)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, n)
|
||||
assert.Equal(t, []byte{3, 4}, buf)
|
||||
|
||||
// Test Seek functionality
|
||||
offset, err := readAtSeeker.Seek(1, io.SeekStart)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(1), offset)
|
||||
|
||||
// Test regular Read after seek
|
||||
n, err = readAtSeeker.Read(buf)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, n)
|
||||
assert.Equal(t, []byte{2, 3}, buf)
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
})
|
||||
|
||||
t.Run("NoReadAt", func(t *testing.T) {
|
||||
in := newTestSeeker(testData) // Has Seek but not ReadAt
|
||||
stats := NewStats(ctx)
|
||||
acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test")
|
||||
|
||||
_, err := acc.WithReadAtSeeker()
|
||||
assert.Error(t, err)
|
||||
assert.ErrorIs(t, err, errors.ErrUnsupported)
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
})
|
||||
|
||||
t.Run("NoSeeker", func(t *testing.T) {
|
||||
// Create a reader that has ReadAt but not Seek interface
|
||||
in := &struct {
|
||||
io.ReadCloser
|
||||
io.ReaderAt
|
||||
}{
|
||||
ReadCloser: io.NopCloser(bytes.NewReader(testData)),
|
||||
ReaderAt: bytes.NewReader(testData),
|
||||
}
|
||||
|
||||
stats := NewStats(ctx)
|
||||
acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test")
|
||||
|
||||
_, err := acc.WithReadAtSeeker()
|
||||
assert.Error(t, err)
|
||||
assert.ErrorIs(t, err, errors.ErrUnsupported)
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
})
|
||||
|
||||
t.Run("NoSeekOrReadAt", func(t *testing.T) {
|
||||
in := newTestNoSeekNoReadAt(testData)
|
||||
stats := NewStats(ctx)
|
||||
acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test")
|
||||
|
||||
_, err := acc.WithReadAtSeeker()
|
||||
assert.Error(t, err)
|
||||
assert.ErrorIs(t, err, errors.ErrUnsupported)
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user