diff --git a/Changelog.md b/Changelog.md index e0991505..03f01150 100644 --- a/Changelog.md +++ b/Changelog.md @@ -14,6 +14,7 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/) * Unsilent errors reading mbox files. * GODT-692 QA build with option to change API URL by ENV variable. * GODT-704 User agent detected by fake IMAP extension instead of AUTH callback (some clients use LOGIN instead of AUTH). +* GODT-695 Parallel upload for ProtonMail target. ### Removed * GODT-519 Unused AUTH scope parsing methods. diff --git a/internal/transfer/provider_pmapi_target.go b/internal/transfer/provider_pmapi_target.go index 50d3776b..579ae61e 100644 --- a/internal/transfer/provider_pmapi_target.go +++ b/internal/transfer/provider_pmapi_target.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "io/ioutil" + "sync" pkgMessage "github.com/ProtonMail/proton-bridge/pkg/message" "github.com/ProtonMail/proton-bridge/pkg/pmapi" @@ -32,6 +33,7 @@ import ( const ( pmapiImportBatchMaxItems = 10 pmapiImportBatchMaxSize = 25 * 1000 * 1000 // 25 MB + pmapiImportWorkers = 5 ) // DefaultMailboxes returns the default mailboxes for default rules if no other is found. @@ -80,6 +82,9 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{} p.importMsgReqSize = 0 + importMsgReqMapCh := make(chan map[string]*pmapi.ImportMsgReq) + wg := p.runImporting(progress, importMsgReqMapCh) + for msg := range ch { if progress.shouldStop() { break @@ -88,13 +93,15 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch if p.isMessageDraft(msg) { p.transferDraft(rules, progress, msg) } else { - p.transferMessage(rules, progress, msg) + p.transferMessage(rules, progress, msg, importMsgReqMapCh) } } if len(p.importMsgReqMap) > 0 { - p.importMessages(progress) + importMsgReqMapCh <- p.importMsgReqMap } + close(importMsgReqMapCh) + wg.Wait() } func (p *PMAPIProvider) isMessageDraft(msg Message) bool { @@ -163,7 +170,7 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string return draft.ID, nil } -func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, msg Message) { +func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, msg Message, importMsgReqMapCh chan map[string]*pmapi.ImportMsgReq) { importMsgReq, err := p.generateImportMsgReq(msg, rules.globalMailbox) if err != nil { progress.messageImported(msg.ID, "", err) @@ -172,7 +179,9 @@ func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, importMsgReqSize := len(importMsgReq.Body) if p.importMsgReqSize+importMsgReqSize > pmapiImportBatchMaxSize || len(p.importMsgReqMap) == pmapiImportBatchMaxItems { - p.importMessages(progress) + importMsgReqMapCh <- p.importMsgReqMap + p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{} + p.importMsgReqSize = 0 } p.importMsgReqMap[msg.ID] = importMsgReq p.importMsgReqSize += importMsgReqSize @@ -267,24 +276,38 @@ func computeMessageFlags(labels []string) (flag int64) { return flag } -func (p *PMAPIProvider) importMessages(progress *Progress) { +func (p *PMAPIProvider) runImporting(progress *Progress, importMsgReqMapCh chan map[string]*pmapi.ImportMsgReq) *sync.WaitGroup { + var wg sync.WaitGroup + for i := 0; i < pmapiImportWorkers; i++ { + wg.Add(1) + go func() { + for importMsgReqMap := range importMsgReqMapCh { + p.importMessages(progress, importMsgReqMap) + } + wg.Done() + }() + } + return &wg +} + +func (p *PMAPIProvider) importMessages(progress *Progress, importMsgReqMap map[string]*pmapi.ImportMsgReq) { if progress.shouldStop() { return } importMsgIDs := []string{} importMsgRequests := []*pmapi.ImportMsgReq{} - for msgID, req := range p.importMsgReqMap { + for msgID, req := range importMsgReqMap { importMsgIDs = append(importMsgIDs, msgID) importMsgRequests = append(importMsgRequests, req) } - log.WithField("msgIDs", importMsgIDs).WithField("size", p.importMsgReqSize).Debug("Importing messages") + log.WithField("msgIDs", importMsgIDs).Trace("Importing messages") results, err := p.importRequest(importMsgIDs[0], importMsgRequests) // In case the whole request failed, try to import every message one by one. if err != nil || len(results) == 0 { log.WithError(err).Warning("Importing messages failed, trying one by one") - for msgID, req := range p.importMsgReqMap { + for msgID, req := range importMsgReqMap { importedID, err := p.importMessage(msgID, progress, req) progress.messageImported(msgID, importedID, err) } @@ -303,9 +326,6 @@ func (p *PMAPIProvider) importMessages(progress *Progress) { progress.messageImported(msgID, result.MessageID, nil) } } - - p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{} - p.importMsgReqSize = 0 } func (p *PMAPIProvider) importMessage(msgSourceID string, progress *Progress, req *pmapi.ImportMsgReq) (importedID string, importedErr error) {