diff --git a/cmd/archive/extract/extract.go b/cmd/archive/extract/extract.go index 3ca1f64e2..ce7c981e2 100644 --- a/cmd/archive/extract/extract.go +++ b/cmd/archive/extract/extract.go @@ -131,7 +131,11 @@ func ArchiveExtract(ctx context.Context, dst fs.Fs, dstDir string, src fs.Fs, sr } // account and buffer the transfer // in = tr.Account(ctx, in).WithBuffer() - in := tr.Account(ctx, in0) + acc := tr.Account(ctx, in0) + in, err := acc.WithReadAtSeeker() + if err != nil { + return err + } // identify format format, _, err := archives.Identify(ctx, "", in) if err != nil { diff --git a/cmd/archive/files/files.go b/cmd/archive/files/files.go index f1c52d0c8..46f9c34d8 100644 --- a/cmd/archive/files/files.go +++ b/cmd/archive/files/files.go @@ -183,7 +183,12 @@ func newFile(ctx context.Context, obj fs.Object, fi stdfs.FileInfo) (stdfs.File, return nil, f.err } // Account the transfer - f.reader = f.transfer.Account(ctx, f.reader) + acc := f.transfer.Account(ctx, f.reader) + h, err := acc.WithReadAtSeeker() + if err != nil { + return nil, err + } + f.reader = h return f, f.err } diff --git a/cmd/archive/list/list.go b/cmd/archive/list/list.go index 2f85131ed..0e0ee43ad 100644 --- a/cmd/archive/list/list.go +++ b/cmd/archive/list/list.go @@ -167,7 +167,11 @@ func ArchiveList(ctx context.Context, src fs.Fs, srcFile string, listFn archives } // account and buffer the transfer // in = tr.Account(ctx, in).WithBuffer() - in := tr.Account(ctx, in0) + acc := tr.Account(ctx, in0) + in, err := acc.WithReadAtSeeker() + if err != nil { + return err + } // identify format format, _, err := archives.Identify(ctx, "", in) diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 0fd9af92f..587683e49 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -402,32 +402,55 @@ func (acc *Account) Read(p []byte) (n int, err error) { return acc.read(acc.in, p) } +// AccountSeeker is an Account with a Seek method. +type AccountSeeker struct { + *Account + do io.Seeker +} + +var _ io.ReadSeeker = AccountSeeker{} + // Seek to position in the object - see io.Seeker -// -// May return an error if not implemented by the underlying reader. -func (acc *Account) Seek(offset int64, whence int) (int64, error) { +func (acc AccountSeeker) Seek(offset int64, whence int) (int64, error) { acc.mu.Lock() defer acc.mu.Unlock() + return acc.do.Seek(offset, whence) +} + +// WithSeeker returns an Account with a Seek method +// +// It returns an error if the underlying reader does not have a Seek method. +func (acc *Account) WithSeeker() (AccountSeeker, error) { do, ok := acc.in.(io.Seeker) if !ok { - return 0, fmt.Errorf("internal error: Seek not implemented for %T", acc.in) + return AccountSeeker{}, fmt.Errorf("internal error: Seek not implemented for %T: %w", acc.in, errors.ErrUnsupported) } - return do.Seek(offset, whence) + return AccountSeeker{ + Account: acc, + do: do, + }, nil } +// AccountReaderAt is an Account with a ReadAt method. +type AccountReaderAt struct { + *Account + do io.ReaderAt +} + +var ( + _ io.Reader = AccountReaderAt{} + _ io.ReaderAt = AccountReaderAt{} +) + // ReadAt from off into p - see io.ReaderAt // // May return an error if not implemented by the underlying reader. -func (acc *Account) ReadAt(p []byte, off int64) (n int, err error) { +func (acc AccountReaderAt) ReadAt(p []byte, off int64) (n int, err error) { acc.mu.Lock() defer acc.mu.Unlock() - do, ok := acc.in.(io.ReaderAt) - if !ok { - return 0, fmt.Errorf("internal error: ReadAt not implemented for %T", acc.in) - } bytesUntilLimit, err := acc.checkReadBefore() if err == nil { - n, err = do.ReadAt(p, off) + n, err = acc.do.ReadAt(p, off) acc.accountRead(n) n, err = acc.checkReadAfter(bytesUntilLimit, n, err) } @@ -435,6 +458,53 @@ func (acc *Account) ReadAt(p []byte, off int64) (n int, err error) { } +// WithReaderAt returns an Account with a ReadAt method +// +// It returns an error if the underlying reader does not have a ReadAt method. +func (acc *Account) WithReaderAt() (AccountReaderAt, error) { + do, ok := acc.in.(io.ReaderAt) + if !ok { + return AccountReaderAt{}, fmt.Errorf("internal error: ReadAt not implemented for %T: %w", acc.in, errors.ErrUnsupported) + } + return AccountReaderAt{ + Account: acc, + do: do, + }, nil +} + +// AccountReadAtSeeker is an Account with both ReadAt and Seek methods +type AccountReadAtSeeker struct { + AccountReaderAt + seeker AccountSeeker +} + +var ( + _ io.Reader = AccountReadAtSeeker{} + _ io.ReaderAt = AccountReadAtSeeker{} + _ io.Seeker = AccountReadAtSeeker{} +) + +// Seek to position in the object - see io.Seeker +func (acc AccountReadAtSeeker) Seek(offset int64, whence int) (int64, error) { + return acc.seeker.Seek(offset, whence) +} + +// WithReadAtSeeker returns an Account with a ReadAt and Seek method +// +// It returns an error if the underlying reader does not have the correct methods. +func (acc *Account) WithReadAtSeeker() (AccountReadAtSeeker, error) { + readerAt, err1 := acc.WithReaderAt() + seeker, err2 := acc.WithSeeker() + err := errors.Join(err1, err2) + if err != nil { + return AccountReadAtSeeker{}, err + } + return AccountReadAtSeeker{ + AccountReaderAt: readerAt, + seeker: seeker, + }, nil +} + // Thin wrapper for w type accountWriteTo struct { w io.Writer diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index 1ed1ba48a..c8c933d37 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -3,6 +3,7 @@ package accounting import ( "bytes" "context" + "errors" "fmt" "io" "strings" @@ -384,3 +385,254 @@ func TestShortenName(t *testing.T) { }) } } + +// Test readers for WithSeeker/WithReaderAt tests +type testSeeker struct { + *bytes.Buffer +} + +func newTestSeeker(data []byte) *testSeeker { + return &testSeeker{bytes.NewBuffer(data)} +} + +func (ts *testSeeker) Close() error { return nil } + +func (ts *testSeeker) Seek(offset int64, whence int) (int64, error) { + // Simple implementation for testing + switch whence { + case io.SeekStart: + ts.Buffer = bytes.NewBuffer(ts.Buffer.Bytes()[offset:]) + return offset, nil + default: + return 0, fmt.Errorf("seek whence %d not implemented", whence) + } +} + +type testReadAtSeeker struct { + *bytes.Reader +} + +func newTestReadAtSeeker(data []byte) *testReadAtSeeker { + return &testReadAtSeeker{bytes.NewReader(data)} +} + +func (tras *testReadAtSeeker) Close() error { return nil } + +type testReaderAt struct { + data []byte + pos int +} + +func newTestReaderAt(data []byte) *testReaderAt { + return &testReaderAt{data: data} +} + +func (tra *testReaderAt) Close() error { return nil } + +func (tra *testReaderAt) Read(p []byte) (n int, err error) { + if tra.pos >= len(tra.data) { + return 0, io.EOF + } + n = copy(p, tra.data[tra.pos:]) + tra.pos += n + if tra.pos >= len(tra.data) { + err = io.EOF + } + return n, err +} + +func (tra *testReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + if off < 0 || off >= int64(len(tra.data)) { + return 0, io.EOF + } + n = copy(p, tra.data[off:]) + if n < len(p) { + err = io.EOF + } + return n, err +} + +type testNoSeekNoReadAt struct { + *bytes.Buffer +} + +func newTestNoSeekNoReadAt(data []byte) *testNoSeekNoReadAt { + return &testNoSeekNoReadAt{bytes.NewBuffer(data)} +} + +func (tns *testNoSeekNoReadAt) Close() error { return nil } + +func TestAccountWithSeeker(t *testing.T) { + ctx := context.Background() + testData := []byte{1, 2, 3, 4, 5} + + t.Run("Success", func(t *testing.T) { + in := newTestSeeker(testData) + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test") + + seeker, err := acc.WithSeeker() + assert.NoError(t, err) + + // Test that it implements the expected interfaces + var _ io.Reader = seeker + var _ io.Seeker = seeker + + // Test seeking functionality + offset, err := seeker.Seek(2, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(2), offset) + + // Test reading after seek + buf := make([]byte, 2) + n, err := seeker.Read(buf) + assert.NoError(t, err) + assert.Equal(t, 2, n) + assert.Equal(t, []byte{3, 4}, buf) + + assert.NoError(t, acc.Close()) + }) + + t.Run("NoSeeker", func(t *testing.T) { + in := newTestNoSeekNoReadAt(testData) + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test") + + _, err := acc.WithSeeker() + assert.Error(t, err) + assert.ErrorIs(t, err, errors.ErrUnsupported) + assert.Contains(t, err.Error(), "Seek not implemented for") + + assert.NoError(t, acc.Close()) + }) +} + +func TestAccountWithReaderAt(t *testing.T) { + ctx := context.Background() + testData := []byte{1, 2, 3, 4, 5} + + t.Run("Success", func(t *testing.T) { + in := newTestReaderAt(testData) + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test") + + readerAt, err := acc.WithReaderAt() + assert.NoError(t, err) + + // Test that it implements the expected interfaces + var _ io.Reader = readerAt + var _ io.ReaderAt = readerAt + + // Test ReadAt functionality + buf := make([]byte, 2) + n, err := readerAt.ReadAt(buf, 2) + assert.NoError(t, err) + assert.Equal(t, 2, n) + assert.Equal(t, []byte{3, 4}, buf) + + // Test that accounting is updated + acc.values.mu.Lock() + bytes := acc.values.bytes + acc.values.mu.Unlock() + assert.Equal(t, int64(2), bytes) + + assert.NoError(t, acc.Close()) + }) + + t.Run("NoReaderAt", func(t *testing.T) { + in := newTestNoSeekNoReadAt(testData) + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test") + + _, err := acc.WithReaderAt() + assert.Error(t, err) + assert.ErrorIs(t, err, errors.ErrUnsupported) + assert.Contains(t, err.Error(), "ReadAt not implemented for") + + assert.NoError(t, acc.Close()) + }) +} + +func TestAccountWithReadAtSeeker(t *testing.T) { + ctx := context.Background() + testData := []byte{1, 2, 3, 4, 5} + + t.Run("Success", func(t *testing.T) { + // Use testReadAtSeeker which implements ReadAt, Seek, and Close + in := newTestReadAtSeeker(testData) + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test") + + readAtSeeker, err := acc.WithReadAtSeeker() + assert.NoError(t, err) + + // Test that it implements the expected interfaces + var _ io.Reader = readAtSeeker + var _ io.ReaderAt = readAtSeeker + var _ io.Seeker = readAtSeeker + + // Test ReadAt functionality + buf := make([]byte, 2) + n, err := readAtSeeker.ReadAt(buf, 2) + assert.NoError(t, err) + assert.Equal(t, 2, n) + assert.Equal(t, []byte{3, 4}, buf) + + // Test Seek functionality + offset, err := readAtSeeker.Seek(1, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(1), offset) + + // Test regular Read after seek + n, err = readAtSeeker.Read(buf) + assert.NoError(t, err) + assert.Equal(t, 2, n) + assert.Equal(t, []byte{2, 3}, buf) + + assert.NoError(t, acc.Close()) + }) + + t.Run("NoReadAt", func(t *testing.T) { + in := newTestSeeker(testData) // Has Seek but not ReadAt + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test") + + _, err := acc.WithReadAtSeeker() + assert.Error(t, err) + assert.ErrorIs(t, err, errors.ErrUnsupported) + + assert.NoError(t, acc.Close()) + }) + + t.Run("NoSeeker", func(t *testing.T) { + // Create a reader that has ReadAt but not Seek interface + in := &struct { + io.ReadCloser + io.ReaderAt + }{ + ReadCloser: io.NopCloser(bytes.NewReader(testData)), + ReaderAt: bytes.NewReader(testData), + } + + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test") + + _, err := acc.WithReadAtSeeker() + assert.Error(t, err) + assert.ErrorIs(t, err, errors.ErrUnsupported) + + assert.NoError(t, acc.Close()) + }) + + t.Run("NoSeekOrReadAt", func(t *testing.T) { + in := newTestNoSeekNoReadAt(testData) + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, int64(len(testData)), "test") + + _, err := acc.WithReadAtSeeker() + assert.Error(t, err) + assert.ErrorIs(t, err, errors.ErrUnsupported) + + assert.NoError(t, acc.Close()) + }) +}