internxt: implement multi-part uploads

Implement multipart upload support with configurable chunk size and concurrency options

Enable OpenChunkWriter with per-chunk encryption

Enhance multipart upload handling with new upload cutoff and error management for small files
This commit is contained in:
José Zúniga
2026-04-24 18:20:18 +02:00
committed by GitHub
parent 328ac017c1
commit c385d8586a
6 changed files with 421 additions and 32 deletions

View File

@@ -30,15 +30,21 @@ import (
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/dircache"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/oauthutil"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random"
)
const (
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2
maxUploadParts = 10000
minChunkSize = fs.SizeSuffix(5 * 1024 * 1024)
minUploadCutoff = fs.SizeSuffix(100 * 1024 * 1024)
defaultUploadCutoff = fs.SizeSuffix(100 * 1024 * 1024)
maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024)
)
// shouldRetry determines if an error should be retried.
@@ -101,6 +107,21 @@ func init() {
Default: true,
Advanced: true,
Help: "Skip hash validation when downloading files.\n\nBy default, hash validation is disabled. Set this to false to enable validation.",
}, {
Name: "upload_concurrency",
Help: "Concurrency for multipart uploads.\n\nThis is the number of chunks of the same file that are uploaded concurrently.\n\nNote that each chunk is buffered in memory.",
Default: 4,
Advanced: true,
}, {
Name: "upload_cutoff",
Help: "Cutoff for switching to multipart upload.\n\nAny files larger than this will be uploaded in chunks of chunk_size.\nThe minimum is 100 MiB and the maximum is 5 GiB.",
Default: defaultUploadCutoff,
Advanced: true,
}, {
Name: "chunk_size",
Help: "Chunk size for multipart uploads.\n\nFiles larger than upload_cutoff will be uploaded in chunks of this size.\n\nMemory usage is approximately chunk_size * upload_concurrency.",
Default: fs.SizeSuffix(30 * 1024 * 1024),
Advanced: true,
}, {
Name: rclone_config.ConfigEncoding,
Help: rclone_config.ConfigEncodingHelp,
@@ -194,6 +215,9 @@ type Options struct {
TwoFA string `config:"2fa"`
Mnemonic string `config:"mnemonic"`
SkipHashValidation bool `config:"skip_hash_validation"`
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
UploadConcurrency int `config:"upload_concurrency"`
Encoding encoder.MultiEncoder `config:"encoding"`
}
@@ -238,6 +262,11 @@ func (f *Fs) Features() *fs.Features {
return f.features
}
// DirCacheFlush resets the directory cache
func (f *Fs) DirCacheFlush() {
f.dirCache.ResetRoot()
}
// Hashes returns type of hashes supported by Internxt
func (f *Fs) Hashes() hash.Set {
return hash.NewHashSet()
@@ -255,6 +284,13 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
return nil, err
}
if err := checkUploadChunkSize(opt.ChunkSize); err != nil {
return nil, fmt.Errorf("internxt: chunk size: %w", err)
}
if err := checkUploadCutoff(opt.UploadCutoff); err != nil {
return nil, fmt.Errorf("internxt: upload cutoff: %w", err)
}
if opt.Mnemonic == "" {
return nil, errors.New("mnemonic is required - please run: rclone config reconnect " + name + ":")
}
@@ -884,32 +920,53 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
fs.Debugf(o.f, "Renamed existing file %s to backup %s.%s (UUID: %s)", remote, backupName, backupType, backupUUID)
}
size := src.Size()
var meta *buckets.CreateMetaResponse
err = o.f.pacer.CallNoRetry(func() (bool, error) {
var err error
meta, err = buckets.UploadFileStreamAuto(ctx,
o.f.cfg,
dirID,
o.f.opt.Encoding.FromStandardName(path.Base(remote)),
in,
src.Size(),
src.ModTime(ctx),
)
return o.f.shouldRetry(ctx, err)
})
if size < 0 || size >= int64(o.f.opt.UploadCutoff) {
chunkWriter, uploadErr := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.f,
OpenOptions: options,
})
if err != nil && isEmptyFileLimitError(err) {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fs.ErrorCantUploadEmptyFiles
}
if uploadErr != nil {
if isEmptyFileLimitError(uploadErr) {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fs.ErrorCantUploadEmptyFiles
}
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return uploadErr
}
w := chunkWriter.(*internxtChunkWriter)
meta = w.meta
} else {
// Use single-part upload for small files
err = o.f.pacer.CallNoRetry(func() (bool, error) {
var err error
meta, err = buckets.UploadFileStreamAuto(ctx,
o.f.cfg,
dirID,
o.f.opt.Encoding.FromStandardName(path.Base(remote)),
in,
size,
src.ModTime(ctx),
)
return o.f.shouldRetry(ctx, err)
})
if err != nil {
meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID)
}
if err != nil && isEmptyFileLimitError(err) {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fs.ErrorCantUploadEmptyFiles
}
if err != nil {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return err
if err != nil {
meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID)
}
if err != nil {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return err
}
}
// Update object metadata

View File

@@ -3,6 +3,7 @@ package internxt_test
import (
"testing"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fstest/fstests"
)
@@ -10,5 +11,9 @@ import (
func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: "TestInternxt:",
ChunkedUpload: fstests.ChunkedUploadConfig{
MinChunkSize: 100 * fs.Mebi,
NeedMultipleChunks: true,
},
})
}

293
backend/internxt/upload.go Normal file
View File

@@ -0,0 +1,293 @@
package internxt
import (
"context"
"crypto/cipher"
"fmt"
"io"
"path"
"sort"
"strings"
"sync"
"github.com/internxt/rclone-adapter/buckets"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pool"
)
var warnStreamUpload sync.Once
func checkUploadChunkSize(cs fs.SizeSuffix) error {
if cs < minChunkSize {
return fmt.Errorf("%s is less than %s", cs, minChunkSize)
}
return nil
}
// SetUploadChunkSize sets the chunk size used for multipart uploads
func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
err := checkUploadChunkSize(cs)
if err == nil {
old := f.opt.ChunkSize
f.opt.ChunkSize = cs
return old, nil
}
return f.opt.ChunkSize, err
}
func checkUploadCutoff(cs fs.SizeSuffix) error {
if cs < minUploadCutoff {
return fmt.Errorf("%s is less than %s (Internxt requires minimum %s for multipart uploads)", cs, minUploadCutoff, minUploadCutoff)
}
if cs > maxUploadCutoff {
return fmt.Errorf("%s is greater than %s", cs, maxUploadCutoff)
}
return nil
}
// SetUploadCutoff sets the cutoff for switching to multipart upload
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
err := checkUploadCutoff(cs)
if err == nil {
old := f.opt.UploadCutoff
f.opt.UploadCutoff = cs
return old, nil
}
return f.opt.UploadCutoff, err
}
// internxtChunkWriter implements fs.ChunkWriter for Internxt multipart uploads.
type internxtChunkWriter struct {
f *Fs
remote string
src fs.ObjectInfo
session *buckets.ChunkUploadSession
completedParts []buckets.CompletedPart
partsMu sync.Mutex
size int64
dirID string
meta *buckets.CreateMetaResponse
chunkSize int64
hashMu sync.Mutex
nextHashChunk int
pendingChunks map[int]*pool.RW
}
// OpenChunkWriter returns the chunk size and a ChunkWriter for multipart uploads.
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
size := src.Size()
info = fs.ChunkWriterInfo{
ChunkSize: int64(f.opt.ChunkSize),
Concurrency: f.opt.UploadConcurrency,
LeavePartsOnError: false,
}
// Reject files below the upload cutoff
if size >= 0 && size < int64(f.opt.UploadCutoff) {
return info, nil, &fs.FileTooSmallError{MinSize: int64(f.opt.UploadCutoff)}
}
chunkSize := f.opt.ChunkSize
if size < 0 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
chunkSize, fs.SizeSuffix(int64(chunkSize)*int64(maxUploadParts)))
})
} else {
chunkSize = chunksize.Calculator(src, size, maxUploadParts, chunkSize)
info.ChunkSize = int64(chunkSize)
}
// Ensure parent directory exists
_, dirID, err := f.dirCache.FindPath(ctx, remote, true)
if err != nil {
return info, nil, fmt.Errorf("failed to find parent directory: %w", err)
}
var session *buckets.ChunkUploadSession
err = f.pacer.Call(func() (bool, error) {
var err error
session, err = buckets.NewChunkUploadSession(ctx, f.cfg, size, int64(chunkSize))
return f.shouldRetry(ctx, err)
})
if err != nil {
return info, nil, fmt.Errorf("failed to create upload session: %w", err)
}
w := &internxtChunkWriter{
f: f,
remote: remote,
src: src,
session: session,
size: size,
dirID: dirID,
chunkSize: int64(chunkSize),
pendingChunks: make(map[int]*pool.RW),
}
return info, w, nil
}
// WriteChunk encrypts plaintext per-chunk using AES-256-CTR at the correct
// byte offset, uploads the encrypted data, then feeds it into the ordered
// hash accumulator
func (w *internxtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
byteOffset := int64(chunkNumber) * w.chunkSize
cipherStream, err := w.session.NewCipherAtOffset(byteOffset)
if err != nil {
return 0, err
}
encRW := multipart.NewRW().Reserve(w.chunkSize)
cipherReader := &cipher.StreamReader{S: cipherStream, R: reader}
size, err := io.Copy(encRW, cipherReader)
if err != nil {
_ = encRW.Close()
return 0, err
}
if size == 0 {
_ = encRW.Close()
return 0, nil
}
var etag string
err = w.f.pacer.Call(func() (bool, error) {
if _, err := encRW.Seek(0, io.SeekStart); err != nil {
return false, err
}
var uploadErr error
etag, uploadErr = w.session.UploadChunk(ctx, chunkNumber, encRW, size)
return w.f.shouldRetry(ctx, uploadErr)
})
if err != nil {
_ = encRW.Close()
return 0, err
}
w.recordCompletedPart(chunkNumber, etag)
if _, err := encRW.Seek(0, io.SeekStart); err != nil {
_ = encRW.Close()
return 0, err
}
w.submitForHashing(chunkNumber, encRW)
return size, nil
}
// recordCompletedPart appends a completed part to the list (thread-safe).
func (w *internxtChunkWriter) recordCompletedPart(chunkNumber int, etag string) {
w.partsMu.Lock()
w.completedParts = append(w.completedParts, buckets.CompletedPart{
PartNumber: chunkNumber + 1,
ETag: etag,
})
w.partsMu.Unlock()
}
// hashWriter is an io.Writer that feeds data into the session's SHA-256 hash.
type hashWriter struct {
session *buckets.ChunkUploadSession
}
func (hw hashWriter) Write(p []byte) (int, error) {
hw.session.HashEncryptedData(p)
return len(p), nil
}
// submitForHashing feeds encrypted chunk data into the session's hash in order.
func (w *internxtChunkWriter) submitForHashing(chunkNumber int, encRW *pool.RW) {
w.hashMu.Lock()
defer w.hashMu.Unlock()
hw := hashWriter{w.session}
if chunkNumber == w.nextHashChunk {
_, _ = encRW.WriteTo(hw)
_ = encRW.Close()
w.nextHashChunk++
for {
next, ok := w.pendingChunks[w.nextHashChunk]
if !ok {
break
}
_, _ = next.WriteTo(hw)
_ = next.Close()
delete(w.pendingChunks, w.nextHashChunk)
w.nextHashChunk++
}
} else {
w.pendingChunks[chunkNumber] = encRW
}
}
// Close completes the multipart upload and registers the file in Internxt Drive.
func (w *internxtChunkWriter) Close(ctx context.Context) error {
w.hashMu.Lock()
pending := len(w.pendingChunks)
if pending != 0 {
for _, rw := range w.pendingChunks {
_ = rw.Close()
}
w.pendingChunks = nil
}
w.hashMu.Unlock()
if pending != 0 {
return fmt.Errorf("internal error: %d chunks still pending hash", pending)
}
// Sort parts by part number
w.partsMu.Lock()
sort.Slice(w.completedParts, func(i, j int) bool {
return w.completedParts[i].PartNumber < w.completedParts[j].PartNumber
})
parts := make([]buckets.CompletedPart, len(w.completedParts))
copy(parts, w.completedParts)
w.partsMu.Unlock()
// Finish multipart upload (SDK computes hash + calls FinishMultipartUpload)
var finishResp *buckets.FinishUploadResp
err := w.f.pacer.Call(func() (bool, error) {
var err error
finishResp, err = w.session.Finish(ctx, parts)
return w.f.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to finish multipart upload: %w", err)
}
// Create file metadata in Internxt Drive
baseName := w.f.opt.Encoding.FromStandardName(path.Base(w.remote))
name := strings.TrimSuffix(baseName, path.Ext(baseName))
ext := strings.TrimPrefix(path.Ext(baseName), ".")
var meta *buckets.CreateMetaResponse
err = w.f.pacer.Call(func() (bool, error) {
var err error
meta, err = buckets.CreateMetaFile(ctx, w.f.cfg,
name, w.f.cfg.Bucket, &finishResp.ID, "03-aes",
w.dirID, name, ext, w.size, w.src.ModTime(ctx))
return w.f.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to create file metadata: %w", err)
}
w.meta = meta
return nil
}
// Abort cleans up after a failed upload.
func (w *internxtChunkWriter) Abort(ctx context.Context) error {
w.hashMu.Lock()
for _, rw := range w.pendingChunks {
_ = rw.Close()
}
w.pendingChunks = nil
w.hashMu.Unlock()
fs.Logf(w.f, "Multipart upload aborted for %s", w.remote)
return nil
}

View File

@@ -4,6 +4,7 @@ package fs
import (
"context"
"errors"
"fmt"
"io"
"math"
"time"
@@ -51,8 +52,26 @@ var (
ErrorCommandNotFound = errors.New("command not found")
ErrorFileNameTooLong = errors.New("file name too long")
ErrorCantListRoot = errors.New("can't list root")
ErrorFileTooSmall = errors.New("file too small for multipart upload")
)
// FileTooSmallError is returned by OpenChunkWriter when a file is below the
// backend's minimum size for multipart uploads. It wraps ErrorFileTooSmall
// and carries the minimum size so callers can retry with a larger file.
type FileTooSmallError struct {
MinSize int64
}
// Error implements the error interface.
func (e *FileTooSmallError) Error() string {
return fmt.Sprintf("file too small for multipart upload (minimum %d bytes)", e.MinSize)
}
// Unwrap returns ErrorFileTooSmall so errors.Is works.
func (e *FileTooSmallError) Unwrap() error {
return ErrorFileTooSmall
}
// CheckClose is a utility function used to check the return from
// Close in a defer statement.
func CheckClose(c io.Closer, err *error) {

View File

@@ -134,7 +134,13 @@ func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) int
const fileName = "chunksize-probe"
src := object.NewStaticObjectInfo(fileName, time.Now(), int64(100*fs.Mebi), true, nil, nil)
info, writer, err := features.OpenChunkWriter(ctx, fileName, src)
require.NoError(t, err)
if err != nil {
// If the probe fails because the file is too small, skip
if errors.Is(err, fs.ErrorFileTooSmall) {
t.Skipf("probe file too small for multipart upload: %v", err)
}
require.NoError(t, err)
}
chunkSize = int(info.ChunkSize)
err = writer.Abort(ctx)
require.NoError(t, err)
@@ -215,6 +221,9 @@ func TestMultithreadCopy(t *testing.T) {
}()
dst, err = multiThreadCopy(ctx, fDst, fileName, src, test.streams, tr)
if errors.Is(err, fs.ErrorFileTooSmall) {
t.Skipf("file too small for multipart upload: %v", err)
}
require.NoError(t, err)
assert.Equal(t, src.Size(), dst.Size())
@@ -292,7 +301,6 @@ func TestMultithreadCopyAbort(t *testing.T) {
ctx := context.Background()
chunkSize := skipIfNotMultithread(ctx, t, r)
size := 2*chunkSize + 1
if *fstest.SizeLimit > 0 && int64(size) > *fstest.SizeLimit {
t.Skipf("exceeded file size limit %d > %d", size, *fstest.SizeLimit)
}
@@ -318,6 +326,9 @@ func TestMultithreadCopyAbort(t *testing.T) {
}()
wg := new(sync.WaitGroup)
dst, err := multiThreadCopy(ctx, r.Fremote, fileName, errorObject{src, int64(size), wg}, 1, tr)
if errors.Is(err, fs.ErrorFileTooSmall) {
t.Skipf("file too small for multipart upload: %v", err)
}
assert.Error(t, err)
assert.Nil(t, dst)

View File

@@ -820,19 +820,23 @@ func Run(t *testing.T, opt *Opt) {
t.Skip("FS has no OpenChunkWriter interface")
}
size5MBs := 5 * 1024 * 1024
contents1 := random.String(size5MBs)
contents2 := random.String(size5MBs)
size1MB := 1 * 1024 * 1024
contents3 := random.String(size1MB)
totalSize := int64(size5MBs*2 + size1MB)
path := "writer-at-subdir/writer-at-file"
objSrc := object.NewStaticObjectInfo(path+"-WRONG-REMOTE", file1.ModTime, -1, true, nil, nil)
objSrc := object.NewStaticObjectInfo(path+"-WRONG-REMOTE", file1.ModTime, totalSize, true, nil, nil)
_, out, err := openChunkWriter(ctx, path, objSrc, &fs.ChunkOption{
ChunkSize: int64(size5MBs),
})
if errors.Is(err, fs.ErrorFileTooSmall) {
t.Skipf("file too small for multipart upload: %v", err)
}
require.NoError(t, err)
contents1 := random.String(size5MBs)
contents2 := random.String(size5MBs)
contents3 := random.String(size1MB)
var n int64
n, err = out.WriteChunk(ctx, 1, strings.NewReader(contents2))
assert.NoError(t, err)