diff --git a/internal/transfer/provider_pmapi.go b/internal/transfer/provider_pmapi.go index d53a2f49..7b11f43c 100644 --- a/internal/transfer/provider_pmapi.go +++ b/internal/transfer/provider_pmapi.go @@ -33,8 +33,8 @@ type PMAPIProvider struct { addressID string keyRing *crypto.KeyRing - importMsgReqMap map[string]*pmapi.ImportMsgReq // Key is msg transfer ID. - importMsgReqSize int + nextImportRequests map[string]*pmapi.ImportMsgReq // Key is msg transfer ID. + nextImportRequestsSize int timeIt *timeIt } @@ -47,8 +47,8 @@ func NewPMAPIProvider(config *pmapi.ClientConfig, clientManager ClientManager, u userID: userID, addressID: addressID, - importMsgReqMap: map[string]*pmapi.ImportMsgReq{}, - importMsgReqSize: 0, + nextImportRequests: map[string]*pmapi.ImportMsgReq{}, + nextImportRequestsSize: 0, timeIt: newTimeIt("pmapi"), } diff --git a/internal/transfer/provider_pmapi_target.go b/internal/transfer/provider_pmapi_target.go index 579ae61e..87e25eae 100644 --- a/internal/transfer/provider_pmapi_target.go +++ b/internal/transfer/provider_pmapi_target.go @@ -33,7 +33,7 @@ import ( const ( pmapiImportBatchMaxItems = 10 pmapiImportBatchMaxSize = 25 * 1000 * 1000 // 25 MB - pmapiImportWorkers = 5 + pmapiImportWorkers = 4 // To keep memory under 1 GB. ) // DefaultMailboxes returns the default mailboxes for default rules if no other is found. @@ -79,11 +79,11 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch // Cache has to be cleared before each transfer to not contain // old stuff from previous cancelled run. - p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{} - p.importMsgReqSize = 0 + p.nextImportRequests = map[string]*pmapi.ImportMsgReq{} + p.nextImportRequestsSize = 0 - importMsgReqMapCh := make(chan map[string]*pmapi.ImportMsgReq) - wg := p.runImporting(progress, importMsgReqMapCh) + preparedImportRequestsCh := make(chan map[string]*pmapi.ImportMsgReq) + wg := p.startImportWorkers(progress, preparedImportRequestsCh) for msg := range ch { if progress.shouldStop() { @@ -93,14 +93,14 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch if p.isMessageDraft(msg) { p.transferDraft(rules, progress, msg) } else { - p.transferMessage(rules, progress, msg, importMsgReqMapCh) + p.transferMessage(rules, progress, msg, preparedImportRequestsCh) } } - if len(p.importMsgReqMap) > 0 { - importMsgReqMapCh <- p.importMsgReqMap + if len(p.nextImportRequests) > 0 { + preparedImportRequestsCh <- p.nextImportRequests } - close(importMsgReqMapCh) + close(preparedImportRequestsCh) wg.Wait() } @@ -170,7 +170,7 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string return draft.ID, nil } -func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, msg Message, importMsgReqMapCh chan map[string]*pmapi.ImportMsgReq) { +func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, msg Message, preparedImportRequestsCh chan map[string]*pmapi.ImportMsgReq) { importMsgReq, err := p.generateImportMsgReq(msg, rules.globalMailbox) if err != nil { progress.messageImported(msg.ID, "", err) @@ -178,13 +178,13 @@ func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, } importMsgReqSize := len(importMsgReq.Body) - if p.importMsgReqSize+importMsgReqSize > pmapiImportBatchMaxSize || len(p.importMsgReqMap) == pmapiImportBatchMaxItems { - importMsgReqMapCh <- p.importMsgReqMap - p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{} - p.importMsgReqSize = 0 + if p.nextImportRequestsSize+importMsgReqSize > pmapiImportBatchMaxSize || len(p.nextImportRequests) == pmapiImportBatchMaxItems { + preparedImportRequestsCh <- p.nextImportRequests + p.nextImportRequests = map[string]*pmapi.ImportMsgReq{} + p.nextImportRequestsSize = 0 } - p.importMsgReqMap[msg.ID] = importMsgReq - p.importMsgReqSize += importMsgReqSize + p.nextImportRequests[msg.ID] = importMsgReq + p.nextImportRequestsSize += importMsgReqSize } func (p *PMAPIProvider) generateImportMsgReq(msg Message, globalMailbox *Mailbox) (*pmapi.ImportMsgReq, error) { @@ -276,13 +276,13 @@ func computeMessageFlags(labels []string) (flag int64) { return flag } -func (p *PMAPIProvider) runImporting(progress *Progress, importMsgReqMapCh chan map[string]*pmapi.ImportMsgReq) *sync.WaitGroup { +func (p *PMAPIProvider) startImportWorkers(progress *Progress, preparedImportRequestsCh chan map[string]*pmapi.ImportMsgReq) *sync.WaitGroup { var wg sync.WaitGroup + wg.Add(pmapiImportWorkers) for i := 0; i < pmapiImportWorkers; i++ { - wg.Add(1) go func() { - for importMsgReqMap := range importMsgReqMapCh { - p.importMessages(progress, importMsgReqMap) + for importRequests := range preparedImportRequestsCh { + p.importMessages(progress, importRequests) } wg.Done() }() @@ -290,14 +290,14 @@ func (p *PMAPIProvider) runImporting(progress *Progress, importMsgReqMapCh chan return &wg } -func (p *PMAPIProvider) importMessages(progress *Progress, importMsgReqMap map[string]*pmapi.ImportMsgReq) { +func (p *PMAPIProvider) importMessages(progress *Progress, importRequests map[string]*pmapi.ImportMsgReq) { if progress.shouldStop() { return } importMsgIDs := []string{} importMsgRequests := []*pmapi.ImportMsgReq{} - for msgID, req := range importMsgReqMap { + for msgID, req := range importRequests { importMsgIDs = append(importMsgIDs, msgID) importMsgRequests = append(importMsgRequests, req) } @@ -307,7 +307,7 @@ func (p *PMAPIProvider) importMessages(progress *Progress, importMsgReqMap map[s // In case the whole request failed, try to import every message one by one. if err != nil || len(results) == 0 { log.WithError(err).Warning("Importing messages failed, trying one by one") - for msgID, req := range importMsgReqMap { + for msgID, req := range importRequests { importedID, err := p.importMessage(msgID, progress, req) progress.messageImported(msgID, importedID, err) }