mirror of
https://github.com/rclone/rclone.git
synced 2026-05-12 10:03:35 -04:00
Implement multipart upload support with configurable chunk size and concurrency options Enable OpenChunkWriter with per-chunk encryption Enhance multipart upload handling with new upload cutoff and error management for small files
294 lines
7.9 KiB
Go
294 lines
7.9 KiB
Go
package internxt
|
|
|
|
import (
|
|
"context"
|
|
"crypto/cipher"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/internxt/rclone-adapter/buckets"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/chunksize"
|
|
"github.com/rclone/rclone/lib/multipart"
|
|
"github.com/rclone/rclone/lib/pool"
|
|
)
|
|
|
|
var warnStreamUpload sync.Once
|
|
|
|
func checkUploadChunkSize(cs fs.SizeSuffix) error {
|
|
if cs < minChunkSize {
|
|
return fmt.Errorf("%s is less than %s", cs, minChunkSize)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetUploadChunkSize sets the chunk size used for multipart uploads
|
|
func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
|
err := checkUploadChunkSize(cs)
|
|
if err == nil {
|
|
old := f.opt.ChunkSize
|
|
f.opt.ChunkSize = cs
|
|
return old, nil
|
|
}
|
|
return f.opt.ChunkSize, err
|
|
}
|
|
|
|
func checkUploadCutoff(cs fs.SizeSuffix) error {
|
|
if cs < minUploadCutoff {
|
|
return fmt.Errorf("%s is less than %s (Internxt requires minimum %s for multipart uploads)", cs, minUploadCutoff, minUploadCutoff)
|
|
}
|
|
if cs > maxUploadCutoff {
|
|
return fmt.Errorf("%s is greater than %s", cs, maxUploadCutoff)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetUploadCutoff sets the cutoff for switching to multipart upload
|
|
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
|
err := checkUploadCutoff(cs)
|
|
if err == nil {
|
|
old := f.opt.UploadCutoff
|
|
f.opt.UploadCutoff = cs
|
|
return old, nil
|
|
}
|
|
return f.opt.UploadCutoff, err
|
|
}
|
|
|
|
// internxtChunkWriter implements fs.ChunkWriter for Internxt multipart uploads.
|
|
type internxtChunkWriter struct {
|
|
f *Fs
|
|
remote string
|
|
src fs.ObjectInfo
|
|
session *buckets.ChunkUploadSession
|
|
completedParts []buckets.CompletedPart
|
|
partsMu sync.Mutex
|
|
size int64
|
|
dirID string
|
|
meta *buckets.CreateMetaResponse
|
|
chunkSize int64
|
|
hashMu sync.Mutex
|
|
nextHashChunk int
|
|
pendingChunks map[int]*pool.RW
|
|
}
|
|
|
|
// OpenChunkWriter returns the chunk size and a ChunkWriter for multipart uploads.
|
|
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
|
|
size := src.Size()
|
|
|
|
info = fs.ChunkWriterInfo{
|
|
ChunkSize: int64(f.opt.ChunkSize),
|
|
Concurrency: f.opt.UploadConcurrency,
|
|
LeavePartsOnError: false,
|
|
}
|
|
|
|
// Reject files below the upload cutoff
|
|
if size >= 0 && size < int64(f.opt.UploadCutoff) {
|
|
return info, nil, &fs.FileTooSmallError{MinSize: int64(f.opt.UploadCutoff)}
|
|
}
|
|
|
|
chunkSize := f.opt.ChunkSize
|
|
if size < 0 {
|
|
warnStreamUpload.Do(func() {
|
|
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
|
|
chunkSize, fs.SizeSuffix(int64(chunkSize)*int64(maxUploadParts)))
|
|
})
|
|
} else {
|
|
chunkSize = chunksize.Calculator(src, size, maxUploadParts, chunkSize)
|
|
info.ChunkSize = int64(chunkSize)
|
|
}
|
|
|
|
// Ensure parent directory exists
|
|
_, dirID, err := f.dirCache.FindPath(ctx, remote, true)
|
|
if err != nil {
|
|
return info, nil, fmt.Errorf("failed to find parent directory: %w", err)
|
|
}
|
|
|
|
var session *buckets.ChunkUploadSession
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
var err error
|
|
session, err = buckets.NewChunkUploadSession(ctx, f.cfg, size, int64(chunkSize))
|
|
return f.shouldRetry(ctx, err)
|
|
})
|
|
if err != nil {
|
|
return info, nil, fmt.Errorf("failed to create upload session: %w", err)
|
|
}
|
|
|
|
w := &internxtChunkWriter{
|
|
f: f,
|
|
remote: remote,
|
|
src: src,
|
|
session: session,
|
|
size: size,
|
|
dirID: dirID,
|
|
chunkSize: int64(chunkSize),
|
|
pendingChunks: make(map[int]*pool.RW),
|
|
}
|
|
|
|
return info, w, nil
|
|
}
|
|
|
|
// WriteChunk encrypts plaintext per-chunk using AES-256-CTR at the correct
|
|
// byte offset, uploads the encrypted data, then feeds it into the ordered
|
|
// hash accumulator
|
|
func (w *internxtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
|
|
byteOffset := int64(chunkNumber) * w.chunkSize
|
|
cipherStream, err := w.session.NewCipherAtOffset(byteOffset)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
encRW := multipart.NewRW().Reserve(w.chunkSize)
|
|
cipherReader := &cipher.StreamReader{S: cipherStream, R: reader}
|
|
size, err := io.Copy(encRW, cipherReader)
|
|
if err != nil {
|
|
_ = encRW.Close()
|
|
return 0, err
|
|
}
|
|
if size == 0 {
|
|
_ = encRW.Close()
|
|
return 0, nil
|
|
}
|
|
|
|
var etag string
|
|
err = w.f.pacer.Call(func() (bool, error) {
|
|
if _, err := encRW.Seek(0, io.SeekStart); err != nil {
|
|
return false, err
|
|
}
|
|
var uploadErr error
|
|
etag, uploadErr = w.session.UploadChunk(ctx, chunkNumber, encRW, size)
|
|
return w.f.shouldRetry(ctx, uploadErr)
|
|
})
|
|
if err != nil {
|
|
_ = encRW.Close()
|
|
return 0, err
|
|
}
|
|
|
|
w.recordCompletedPart(chunkNumber, etag)
|
|
|
|
if _, err := encRW.Seek(0, io.SeekStart); err != nil {
|
|
_ = encRW.Close()
|
|
return 0, err
|
|
}
|
|
w.submitForHashing(chunkNumber, encRW)
|
|
|
|
return size, nil
|
|
}
|
|
|
|
// recordCompletedPart appends a completed part to the list (thread-safe).
|
|
func (w *internxtChunkWriter) recordCompletedPart(chunkNumber int, etag string) {
|
|
w.partsMu.Lock()
|
|
w.completedParts = append(w.completedParts, buckets.CompletedPart{
|
|
PartNumber: chunkNumber + 1,
|
|
ETag: etag,
|
|
})
|
|
w.partsMu.Unlock()
|
|
}
|
|
|
|
// hashWriter is an io.Writer that feeds data into the session's SHA-256 hash.
|
|
type hashWriter struct {
|
|
session *buckets.ChunkUploadSession
|
|
}
|
|
|
|
func (hw hashWriter) Write(p []byte) (int, error) {
|
|
hw.session.HashEncryptedData(p)
|
|
return len(p), nil
|
|
}
|
|
|
|
// submitForHashing feeds encrypted chunk data into the session's hash in order.
|
|
func (w *internxtChunkWriter) submitForHashing(chunkNumber int, encRW *pool.RW) {
|
|
w.hashMu.Lock()
|
|
defer w.hashMu.Unlock()
|
|
|
|
hw := hashWriter{w.session}
|
|
|
|
if chunkNumber == w.nextHashChunk {
|
|
_, _ = encRW.WriteTo(hw)
|
|
_ = encRW.Close()
|
|
w.nextHashChunk++
|
|
for {
|
|
next, ok := w.pendingChunks[w.nextHashChunk]
|
|
if !ok {
|
|
break
|
|
}
|
|
_, _ = next.WriteTo(hw)
|
|
_ = next.Close()
|
|
delete(w.pendingChunks, w.nextHashChunk)
|
|
w.nextHashChunk++
|
|
}
|
|
} else {
|
|
w.pendingChunks[chunkNumber] = encRW
|
|
}
|
|
}
|
|
|
|
// Close completes the multipart upload and registers the file in Internxt Drive.
|
|
func (w *internxtChunkWriter) Close(ctx context.Context) error {
|
|
w.hashMu.Lock()
|
|
pending := len(w.pendingChunks)
|
|
if pending != 0 {
|
|
for _, rw := range w.pendingChunks {
|
|
_ = rw.Close()
|
|
}
|
|
w.pendingChunks = nil
|
|
}
|
|
w.hashMu.Unlock()
|
|
if pending != 0 {
|
|
return fmt.Errorf("internal error: %d chunks still pending hash", pending)
|
|
}
|
|
|
|
// Sort parts by part number
|
|
w.partsMu.Lock()
|
|
sort.Slice(w.completedParts, func(i, j int) bool {
|
|
return w.completedParts[i].PartNumber < w.completedParts[j].PartNumber
|
|
})
|
|
parts := make([]buckets.CompletedPart, len(w.completedParts))
|
|
copy(parts, w.completedParts)
|
|
w.partsMu.Unlock()
|
|
|
|
// Finish multipart upload (SDK computes hash + calls FinishMultipartUpload)
|
|
var finishResp *buckets.FinishUploadResp
|
|
err := w.f.pacer.Call(func() (bool, error) {
|
|
var err error
|
|
finishResp, err = w.session.Finish(ctx, parts)
|
|
return w.f.shouldRetry(ctx, err)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to finish multipart upload: %w", err)
|
|
}
|
|
|
|
// Create file metadata in Internxt Drive
|
|
baseName := w.f.opt.Encoding.FromStandardName(path.Base(w.remote))
|
|
name := strings.TrimSuffix(baseName, path.Ext(baseName))
|
|
ext := strings.TrimPrefix(path.Ext(baseName), ".")
|
|
|
|
var meta *buckets.CreateMetaResponse
|
|
err = w.f.pacer.Call(func() (bool, error) {
|
|
var err error
|
|
meta, err = buckets.CreateMetaFile(ctx, w.f.cfg,
|
|
name, w.f.cfg.Bucket, &finishResp.ID, "03-aes",
|
|
w.dirID, name, ext, w.size, w.src.ModTime(ctx))
|
|
return w.f.shouldRetry(ctx, err)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create file metadata: %w", err)
|
|
}
|
|
w.meta = meta
|
|
|
|
return nil
|
|
}
|
|
|
|
// Abort cleans up after a failed upload.
|
|
func (w *internxtChunkWriter) Abort(ctx context.Context) error {
|
|
w.hashMu.Lock()
|
|
for _, rw := range w.pendingChunks {
|
|
_ = rw.Close()
|
|
}
|
|
w.pendingChunks = nil
|
|
w.hashMu.Unlock()
|
|
fs.Logf(w.f, "Multipart upload aborted for %s", w.remote)
|
|
return nil
|
|
}
|