Better naming

This commit is contained in:
Michal Horejsek
2020-09-16 16:23:55 +02:00
parent 7337f78d4a
commit a73b30ed9e
2 changed files with 27 additions and 27 deletions

View File

@ -33,8 +33,8 @@ type PMAPIProvider struct {
addressID string
keyRing *crypto.KeyRing
importMsgReqMap map[string]*pmapi.ImportMsgReq // Key is msg transfer ID.
importMsgReqSize int
nextImportRequests map[string]*pmapi.ImportMsgReq // Key is msg transfer ID.
nextImportRequestsSize int
timeIt *timeIt
}
@ -47,8 +47,8 @@ func NewPMAPIProvider(config *pmapi.ClientConfig, clientManager ClientManager, u
userID: userID,
addressID: addressID,
importMsgReqMap: map[string]*pmapi.ImportMsgReq{},
importMsgReqSize: 0,
nextImportRequests: map[string]*pmapi.ImportMsgReq{},
nextImportRequestsSize: 0,
timeIt: newTimeIt("pmapi"),
}

View File

@ -33,7 +33,7 @@ import (
const (
pmapiImportBatchMaxItems = 10
pmapiImportBatchMaxSize = 25 * 1000 * 1000 // 25 MB
pmapiImportWorkers = 5
pmapiImportWorkers = 4 // To keep memory under 1 GB.
)
// DefaultMailboxes returns the default mailboxes for default rules if no other is found.
@ -79,11 +79,11 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch
// Cache has to be cleared before each transfer to not contain
// old stuff from previous cancelled run.
p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{}
p.importMsgReqSize = 0
p.nextImportRequests = map[string]*pmapi.ImportMsgReq{}
p.nextImportRequestsSize = 0
importMsgReqMapCh := make(chan map[string]*pmapi.ImportMsgReq)
wg := p.runImporting(progress, importMsgReqMapCh)
preparedImportRequestsCh := make(chan map[string]*pmapi.ImportMsgReq)
wg := p.startImportWorkers(progress, preparedImportRequestsCh)
for msg := range ch {
if progress.shouldStop() {
@ -93,14 +93,14 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch
if p.isMessageDraft(msg) {
p.transferDraft(rules, progress, msg)
} else {
p.transferMessage(rules, progress, msg, importMsgReqMapCh)
p.transferMessage(rules, progress, msg, preparedImportRequestsCh)
}
}
if len(p.importMsgReqMap) > 0 {
importMsgReqMapCh <- p.importMsgReqMap
if len(p.nextImportRequests) > 0 {
preparedImportRequestsCh <- p.nextImportRequests
}
close(importMsgReqMapCh)
close(preparedImportRequestsCh)
wg.Wait()
}
@ -170,7 +170,7 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string
return draft.ID, nil
}
func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, msg Message, importMsgReqMapCh chan map[string]*pmapi.ImportMsgReq) {
func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, msg Message, preparedImportRequestsCh chan map[string]*pmapi.ImportMsgReq) {
importMsgReq, err := p.generateImportMsgReq(msg, rules.globalMailbox)
if err != nil {
progress.messageImported(msg.ID, "", err)
@ -178,13 +178,13 @@ func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress,
}
importMsgReqSize := len(importMsgReq.Body)
if p.importMsgReqSize+importMsgReqSize > pmapiImportBatchMaxSize || len(p.importMsgReqMap) == pmapiImportBatchMaxItems {
importMsgReqMapCh <- p.importMsgReqMap
p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{}
p.importMsgReqSize = 0
if p.nextImportRequestsSize+importMsgReqSize > pmapiImportBatchMaxSize || len(p.nextImportRequests) == pmapiImportBatchMaxItems {
preparedImportRequestsCh <- p.nextImportRequests
p.nextImportRequests = map[string]*pmapi.ImportMsgReq{}
p.nextImportRequestsSize = 0
}
p.importMsgReqMap[msg.ID] = importMsgReq
p.importMsgReqSize += importMsgReqSize
p.nextImportRequests[msg.ID] = importMsgReq
p.nextImportRequestsSize += importMsgReqSize
}
func (p *PMAPIProvider) generateImportMsgReq(msg Message, globalMailbox *Mailbox) (*pmapi.ImportMsgReq, error) {
@ -276,13 +276,13 @@ func computeMessageFlags(labels []string) (flag int64) {
return flag
}
func (p *PMAPIProvider) runImporting(progress *Progress, importMsgReqMapCh chan map[string]*pmapi.ImportMsgReq) *sync.WaitGroup {
func (p *PMAPIProvider) startImportWorkers(progress *Progress, preparedImportRequestsCh chan map[string]*pmapi.ImportMsgReq) *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(pmapiImportWorkers)
for i := 0; i < pmapiImportWorkers; i++ {
wg.Add(1)
go func() {
for importMsgReqMap := range importMsgReqMapCh {
p.importMessages(progress, importMsgReqMap)
for importRequests := range preparedImportRequestsCh {
p.importMessages(progress, importRequests)
}
wg.Done()
}()
@ -290,14 +290,14 @@ func (p *PMAPIProvider) runImporting(progress *Progress, importMsgReqMapCh chan
return &wg
}
func (p *PMAPIProvider) importMessages(progress *Progress, importMsgReqMap map[string]*pmapi.ImportMsgReq) {
func (p *PMAPIProvider) importMessages(progress *Progress, importRequests map[string]*pmapi.ImportMsgReq) {
if progress.shouldStop() {
return
}
importMsgIDs := []string{}
importMsgRequests := []*pmapi.ImportMsgReq{}
for msgID, req := range importMsgReqMap {
for msgID, req := range importRequests {
importMsgIDs = append(importMsgIDs, msgID)
importMsgRequests = append(importMsgRequests, req)
}
@ -307,7 +307,7 @@ func (p *PMAPIProvider) importMessages(progress *Progress, importMsgReqMap map[s
// In case the whole request failed, try to import every message one by one.
if err != nil || len(results) == 0 {
log.WithError(err).Warning("Importing messages failed, trying one by one")
for msgID, req := range importMsgReqMap {
for msgID, req := range importRequests {
importedID, err := p.importMessage(msgID, progress, req)
progress.messageImported(msgID, importedID, err)
}