From acbb0db3d0fa331e07ebf2d9e4eaf5c550fac6fe Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Sun, 16 Jul 2023 09:28:15 +0200 Subject: [PATCH] Parallel upload block with concurrency control --- README.md | 22 ++++++++++++---------- common/config.go | 16 +++++++++++++++- drive.go | 9 +++++++-- file.go | 13 +++++++++---- 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 54086b5..38e106e 100644 --- a/README.md +++ b/README.md @@ -19,11 +19,12 @@ We are using a fork of the [proton-go-api](https://github.com/henrybear327/go-pr # Drive APIs > In collaboration with Azimjon Pulatov, in memory of our good old days at Meta, London, in the summer of 2022. +> > Thanks to Anson Chen for the motivation and some initial help on various matters! Currently, the development are split into 2 versions. V1 supports the features [required by rclone](https://github.com/henrybear327/rclone/blob/master/fs/types.go), such as `file listing`. As the unit and integration tests from rclone have all been passed, we would stabilize this and then move onto developing V2. -V2 will bring in optimizations and enhancements, such as optimizing uploading and downloading performance, supporting thumbnails, etc. +V2 will bring in optimizations and enhancements, esp. supporting thumbnails. Please see the list below. ## V1 @@ -97,11 +98,10 @@ V2 will bring in optimizations and enhancements, such as optimizing uploading an - [x] Not found: ERROR RESTY 422: File or folder was not found. (Code=2501, Status=422), Attempt 1 - [x] Failed upload: Draft already exists on this revision (Code=2500, Status=409) - [x] Fix file upload progress -> If the upload failed, please Replace file. If the upload is still in progress, replacing it will cancel the ongoing upload +- [x] Concurrency control on file encryption, decryption, and block upload ### Known limitations -- Large file handling: for downloading, the file will be written when all blocks are decrypted and checked -- Crypto-related operations, e.g. signature verification, still needs to cross check with iOS or web open source codebase - No thumbnails, respecting accepted MIME types, max upload size, can't init Proton Drive, etc. - Assumptions - only one main share per account @@ -109,20 +109,22 @@ V2 will bring in optimizations and enhancements, such as optimizing uploading an ## V2 -- [ ] Confirm the HMAC algorithm -> if you create a draft using integration test, and then use the web frontend to finish the upload (you will see overwrite pop-up), and then use the web frontend to upload again the same file, but this time you will have 2 files with duplicated names -- [ ] Might have missing signature issues on some old accounts, e.g. GetHashKey on rootLink might fail -> currently have a quick patch, but might need to double check the behavior -- [ ] Mimetype detection by [using the file content itself](github.com/gabriel-vasile/mimetype) +- [ ] Support thumbnail +- [ ] Potential bugs + - [ ] Confirm the HMAC algorithm -> if you create a draft using integration test, and then use the web frontend to finish the upload (you will see overwrite pop-up), and then use the web frontend to upload again the same file, but this time you will have 2 files with duplicated names + - [ ] Might have missing signature issues on some old accounts, e.g. GetHashKey on rootLink might fail -> currently have a quick patch, but might need to double check the behavior + - [ ] Double check the attrs field parsing, esp. for size + - [ ] Double check the attrs field, esp. for size +- [ ] Crypto-related operations, e.g. signature verification, still needs to cross check with iOS or web open source codebase +- [ ] Mimetype detection by [using the file content itself](github.com/gabriel-vasile/mimetype), or Google content sniffer - [ ] Remove e.g. proton.link related exposures in the function signature (this library should abstract them all) -- [ ] Documentation -- [ ] Handle failed / interrupted upload -> file the bug report to ProtonMail +- [ ] Improve documentation - [ ] Go through Drive iOS source code and check the logic control flow -- [ ] Figure out the bottleneck by doing some profiling - [ ] File - [ ] Parallel download / upload -> enc/dec is expensive - [ ] [Filename encoding](https://github.com/ProtonMail/WebClients/blob/b4eba99d241af4fdae06ff7138bd651a40ef5d3c/applications/drive/src/app/store/_links/validation.ts#L51) - [ ] Commit back to proton-go-api and switch to using upstream (make sure the tag is at the tip though) - [ ] Support legacy 2-password mode -- [ ] Support thumbnail - [ ] Proton Drive init (no prior Proton Drive login before -> probably will have no key, volume, etc. to start with at all) - [ ] linkID caching -> would need to listen to the event api though - [ ] Integration tests diff --git a/common/config.go b/common/config.go index dbe0fa0..a92bde6 100644 --- a/common/config.go +++ b/common/config.go @@ -1,6 +1,10 @@ package common -import "os" +import ( + "log" + "os" + "runtime" +) type Config struct { /* Constants */ @@ -18,6 +22,8 @@ type Config struct { EmptyTrashAfterIntegrationTest bool // CAUTION: the integration test will clean up all the data in the trash ReplaceExistingDraft bool // for the file upload replace or keep it as-is option EnableCaching bool // link node caching + ConcurrentBlockUploadCount int + ConcurrentFileCryptoCount int /* Drive */ DataFolderName string @@ -37,6 +43,8 @@ type ReusableCredentialData struct { } func NewConfigWithDefaultValues() *Config { + log.Println("Number of CPUs", runtime.GOMAXPROCS(0)) + return &Config{ AppVersion: "", UserAgent: "", @@ -59,12 +67,16 @@ func NewConfigWithDefaultValues() *Config { EmptyTrashAfterIntegrationTest: false, ReplaceExistingDraft: false, EnableCaching: true, + ConcurrentBlockUploadCount: 20, // let's be a nice citizen and not stress out proton engineers :) + ConcurrentFileCryptoCount: runtime.GOMAXPROCS(0), DataFolderName: "data", } } func NewConfigForIntegrationTests() *Config { + log.Println("Number of CPUs", runtime.GOMAXPROCS(0)) + appVersion := os.Getenv("PROTON_API_BRIDGE_APP_VERSION") userAgent := os.Getenv("PROTON_API_BRIDGE_USER_AGENT") @@ -105,6 +117,8 @@ func NewConfigForIntegrationTests() *Config { EmptyTrashAfterIntegrationTest: true, ReplaceExistingDraft: false, EnableCaching: true, + ConcurrentBlockUploadCount: 20, + ConcurrentFileCryptoCount: runtime.GOMAXPROCS(0), DataFolderName: "data", } diff --git a/drive.go b/drive.go index 2295c34..d9e2723 100644 --- a/drive.go +++ b/drive.go @@ -5,6 +5,7 @@ import ( "log" "github.com/henrybear327/Proton-API-Bridge/common" + "golang.org/x/sync/semaphore" "github.com/ProtonMail/gopenpgp/v2/crypto" "github.com/henrybear327/go-proton-api" @@ -26,7 +27,9 @@ type ProtonDrive struct { addrData []proton.Address signatureAddress string - cache *cache + cache *cache + blockUploadSemaphore *semaphore.Weighted + blockCryptoSemaphore *semaphore.Weighted } func NewDefaultConfig() *common.Config { @@ -143,7 +146,9 @@ func NewProtonDrive(ctx context.Context, config *common.Config, authHandler prot addrData: addrData, signatureAddress: mainShare.Creator, - cache: newCache(config.EnableCaching), + cache: newCache(config.EnableCaching), + blockUploadSemaphore: semaphore.NewWeighted(int64(config.ConcurrentBlockUploadCount)), + blockCryptoSemaphore: semaphore.NewWeighted(int64(config.ConcurrentFileCryptoCount)), }, credentials, nil } diff --git a/file.go b/file.go index b055f23..5481cc6 100644 --- a/file.go +++ b/file.go @@ -206,10 +206,7 @@ func (protonDrive *ProtonDrive) DownloadFile(ctx context.Context, link *proton.L return nil, 0, nil, err } - if fileSystemAttrs != nil { - return reader, link.Size, fileSystemAttrs, nil - } - return reader, link.Size, nil, nil + return reader, link.Size, fileSystemAttrs, nil } func (reader *FileDownloadReader) downloadFileOnRead() error { @@ -496,6 +493,14 @@ func (protonDrive *ProtonDrive) uploadAndCollectBlockData(ctx context.Context, n errChan := make(chan error) uploadBlockWrapper := func(ctx context.Context, errChan chan error, bareURL, token string, block io.Reader) { + // log.Println("Before semaphore") + if err := protonDrive.blockUploadSemaphore.Acquire(ctx, 1); err != nil { + errChan <- err + } + defer protonDrive.blockUploadSemaphore.Release(1) + // log.Println("After semaphore") + // defer log.Println("Release semaphore") + errChan <- protonDrive.c.UploadBlock(ctx, bareURL, token, block) } for i := range blockUploadResp {