From f3773c9d7854694037d735893b580e664087b609 Mon Sep 17 00:00:00 2001 From: Michal Horejsek Date: Fri, 11 Sep 2020 15:09:37 +0200 Subject: [PATCH] I/E measurements --- Changelog.md | 1 + internal/transfer/progress.go | 19 +++-- internal/transfer/provider_imap.go | 4 ++ internal/transfer/provider_imap_source.go | 11 +++ internal/transfer/provider_pmapi.go | 4 ++ internal/transfer/provider_pmapi_source.go | 6 ++ internal/transfer/provider_pmapi_target.go | 30 +++++--- internal/transfer/provider_pmapi_utils.go | 23 ++++++- internal/transfer/timeit.go | 80 ++++++++++++++++++++++ internal/transfer/transfer.go | 5 +- 10 files changed, 165 insertions(+), 18 deletions(-) create mode 100644 internal/transfer/timeit.go diff --git a/Changelog.md b/Changelog.md index eb8e7ae6..59a15655 100644 --- a/Changelog.md +++ b/Changelog.md @@ -7,6 +7,7 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/) ### Added * GODT-682 Persistent anonymous API cookies for Import-Export. * GODT-357 Use go-message to make a better message parser. +* GODT-720 Time measurement of progress for Import-Export. ### Changed * GODT-511 User agent format changed. diff --git a/internal/transfer/progress.go b/internal/transfer/progress.go index b4873077..97b2aaf7 100644 --- a/internal/transfer/progress.go +++ b/internal/transfer/progress.go @@ -62,11 +62,20 @@ func (p *Progress) update() { return } - // In case no one listens for an update, do not block the progress. - select { - case p.updateCh <- struct{}{}: - case <-time.After(100 * time.Millisecond): - } + // In case no one listens for an update, do not block the whole progress. + go func() { + defer func() { + // updateCh can be closed at the end of progress which is fine. + if r := recover(); r != nil { + log.WithField("r", r).Warn("Failed to send update") + } + }() + + select { + case p.updateCh <- struct{}{}: + case <-time.After(5 * time.Millisecond): + } + }() } // finish should be called as the last call once everything is done. diff --git a/internal/transfer/provider_imap.go b/internal/transfer/provider_imap.go index 0d96c04e..c2669c49 100644 --- a/internal/transfer/provider_imap.go +++ b/internal/transfer/provider_imap.go @@ -31,6 +31,8 @@ type IMAPProvider struct { addr string client *imapClient.Client + + timeIt *timeIt } // NewIMAPProvider returns new IMAPProvider. @@ -39,6 +41,8 @@ func NewIMAPProvider(username, password, host, port string) (*IMAPProvider, erro username: username, password: password, addr: net.JoinHostPort(host, port), + + timeIt: newTimeIt("imap"), } if err := p.auth(); err != nil { diff --git a/internal/transfer/provider_imap_source.go b/internal/transfer/provider_imap_source.go index 1894487e..e732dbf2 100644 --- a/internal/transfer/provider_imap_source.go +++ b/internal/transfer/provider_imap_source.go @@ -40,6 +40,9 @@ func (p *IMAPProvider) TransferTo(rules transferRules, progress *Progress, ch ch log.Info("Started transfer from IMAP to channel") defer log.Info("Finished transfer from IMAP to channel") + p.timeIt.clear() + defer p.timeIt.logResults() + imapMessageInfoMap := p.loadMessageInfoMap(rules, progress) for rule := range rules.iterateActiveRules() { @@ -78,6 +81,9 @@ func (p *IMAPProvider) loadMessageInfoMap(rules transferRules, progress *Progres } func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValidity, count uint32) map[string]imapMessageInfo { + p.timeIt.start("load", rule.SourceMailbox.Name) + defer p.timeIt.stop("load", rule.SourceMailbox.Name) + messagesInfo := map[string]imapMessageInfo{} pageStart := uint32(1) @@ -199,13 +205,18 @@ func (p *IMAPProvider) exportMessages(rule *Rule, progress *Progress, ch chan<- progress.messageExported(id, body, err) if err == nil { msg := p.exportMessage(rule, id, imapMessage, body) + + p.timeIt.stop("fetch", rule.SourceMailbox.Name) ch <- msg + p.timeIt.start("fetch", rule.SourceMailbox.Name) } } + p.timeIt.start("fetch", rule.SourceMailbox.Name) progress.callWrap(func() error { return p.uidFetch(rule.SourceMailbox.Name, seqSet, items, processMessageCallback) }) + p.timeIt.stop("fetch", rule.SourceMailbox.Name) } func (p *IMAPProvider) exportMessage(rule *Rule, id string, imapMessage *imap.Message, body []byte) Message { diff --git a/internal/transfer/provider_pmapi.go b/internal/transfer/provider_pmapi.go index 33210b9b..d53a2f49 100644 --- a/internal/transfer/provider_pmapi.go +++ b/internal/transfer/provider_pmapi.go @@ -35,6 +35,8 @@ type PMAPIProvider struct { importMsgReqMap map[string]*pmapi.ImportMsgReq // Key is msg transfer ID. importMsgReqSize int + + timeIt *timeIt } // NewPMAPIProvider returns new PMAPIProvider. @@ -47,6 +49,8 @@ func NewPMAPIProvider(config *pmapi.ClientConfig, clientManager ClientManager, u importMsgReqMap: map[string]*pmapi.ImportMsgReq{}, importMsgReqSize: 0, + + timeIt: newTimeIt("pmapi"), } if addressID != "" { diff --git a/internal/transfer/provider_pmapi_source.go b/internal/transfer/provider_pmapi_source.go index cf6541fc..3bd870e7 100644 --- a/internal/transfer/provider_pmapi_source.go +++ b/internal/transfer/provider_pmapi_source.go @@ -34,6 +34,9 @@ func (p *PMAPIProvider) TransferTo(rules transferRules, progress *Progress, ch c log.Info("Started transfer from PMAPI to channel") defer log.Info("Finished transfer from PMAPI to channel") + p.timeIt.clear() + defer p.timeIt.logResults() + // TransferTo cannot end sooner than loadCounts goroutine because // loadCounts writes to channel in progress which would be closed. // That can happen for really small accounts. @@ -147,6 +150,9 @@ func (p *PMAPIProvider) exportMessage(rule *Rule, progress *Progress, pmapiMsgID return err }) + p.timeIt.start("build", msgID) + defer p.timeIt.stop("build", msgID) + msgBuilder := pkgMessage.NewBuilder(p.client(), msg) msgBuilder.EncryptedToHTML = false _, body, err := msgBuilder.BuildMessage() diff --git a/internal/transfer/provider_pmapi_target.go b/internal/transfer/provider_pmapi_target.go index 9fd3d3af..50d3776b 100644 --- a/internal/transfer/provider_pmapi_target.go +++ b/internal/transfer/provider_pmapi_target.go @@ -72,6 +72,9 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch log.Info("Started transfer from channel to PMAPI") defer log.Info("Finished transfer from channel to PMAPI") + p.timeIt.clear() + defer p.timeIt.logResults() + // Cache has to be cleared before each transfer to not contain // old stuff from previous cancelled run. p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{} @@ -114,7 +117,10 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string return "", errors.Wrap(err, "failed to parse message") } - if err := message.Encrypt(p.keyRing, nil); err != nil { + p.timeIt.start("encrypt", msg.ID) + err = message.Encrypt(p.keyRing, nil) + p.timeIt.stop("encrypt", msg.ID) + if err != nil { return "", errors.Wrap(err, "failed to encrypt draft") } @@ -125,7 +131,7 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string attachments := message.Attachments message.Attachments = nil - draft, err := p.createDraft(message, "", pmapi.DraftActionReply) + draft, err := p.createDraft(msg.ID, message, "", pmapi.DraftActionReply) if err != nil { return "", errors.Wrap(err, "failed to create draft") } @@ -140,13 +146,15 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string return "", errors.Wrap(err, "failed to sign attachment") } + p.timeIt.start("encrypt", msg.ID) r = bytes.NewReader(attachmentBody) encReader, err := attachment.Encrypt(p.keyRing, r) + p.timeIt.stop("encrypt", msg.ID) if err != nil { return "", errors.Wrap(err, "failed to encrypt attachment") } - _, err = p.createAttachment(attachment, encReader, sigReader) + _, err = p.createAttachment(msg.ID, attachment, encReader, sigReader) if err != nil { return "", errors.Wrap(err, "failed to create attachment") } @@ -176,7 +184,9 @@ func (p *PMAPIProvider) generateImportMsgReq(msg Message, globalMailbox *Mailbox return nil, errors.Wrap(err, "failed to parse message") } + p.timeIt.start("encrypt", msg.ID) body, err := p.encryptMessage(message, attachmentReaders) + p.timeIt.stop("encrypt", msg.ID) if err != nil { return nil, errors.Wrap(err, "failed to encrypt message") } @@ -208,6 +218,9 @@ func (p *PMAPIProvider) generateImportMsgReq(msg Message, globalMailbox *Mailbox } func (p *PMAPIProvider) parseMessage(msg Message) (m *pmapi.Message, r []io.Reader, err error) { + p.timeIt.start("parse", msg.ID) + defer p.timeIt.stop("parse", msg.ID) + // Old message parser is panicking in some cases. // Instead of crashing we try to convert to regular error. defer func() { @@ -265,15 +278,14 @@ func (p *PMAPIProvider) importMessages(progress *Progress) { importMsgIDs = append(importMsgIDs, msgID) importMsgRequests = append(importMsgRequests, req) } - log.WithField("msgIDs", importMsgIDs).WithField("size", p.importMsgReqSize).Debug("Importing messages") - results, err := p.importRequest(importMsgRequests) + results, err := p.importRequest(importMsgIDs[0], importMsgRequests) // In case the whole request failed, try to import every message one by one. if err != nil || len(results) == 0 { log.WithError(err).Warning("Importing messages failed, trying one by one") for msgID, req := range p.importMsgReqMap { - importedID, err := p.importMessage(progress, req) + importedID, err := p.importMessage(msgID, progress, req) progress.messageImported(msgID, importedID, err) } return @@ -285,7 +297,7 @@ func (p *PMAPIProvider) importMessages(progress *Progress) { if result.Error != nil { log.WithError(result.Error).WithField("msg", msgID).Warning("Importing message failed, trying alone") req := importMsgRequests[index] - importedID, err := p.importMessage(progress, req) + importedID, err := p.importMessage(msgID, progress, req) progress.messageImported(msgID, importedID, err) } else { progress.messageImported(msgID, result.MessageID, nil) @@ -296,9 +308,9 @@ func (p *PMAPIProvider) importMessages(progress *Progress) { p.importMsgReqSize = 0 } -func (p *PMAPIProvider) importMessage(progress *Progress, req *pmapi.ImportMsgReq) (importedID string, importedErr error) { +func (p *PMAPIProvider) importMessage(msgSourceID string, progress *Progress, req *pmapi.ImportMsgReq) (importedID string, importedErr error) { progress.callWrap(func() error { - results, err := p.importRequest([]*pmapi.ImportMsgReq{req}) + results, err := p.importRequest(msgSourceID, []*pmapi.ImportMsgReq{req}) if err != nil { return errors.Wrap(err, "failed to import messages") } diff --git a/internal/transfer/provider_pmapi_utils.go b/internal/transfer/provider_pmapi_utils.go index 0633e52e..25c8bc07 100644 --- a/internal/transfer/provider_pmapi_utils.go +++ b/internal/transfer/provider_pmapi_utils.go @@ -18,6 +18,7 @@ package transfer import ( + "fmt" "io" "time" @@ -71,6 +72,10 @@ func (p *PMAPIProvider) tryReconnect() error { func (p *PMAPIProvider) listMessages(filter *pmapi.MessagesFilter) (messages []*pmapi.Message, count int, err error) { err = p.ensureConnection(func() error { + key := fmt.Sprintf("%s_%d", filter.LabelID, filter.Page) + p.timeIt.start("listing", key) + defer p.timeIt.stop("listing", key) + messages, count, err = p.client().ListMessages(filter) return err }) @@ -79,30 +84,42 @@ func (p *PMAPIProvider) listMessages(filter *pmapi.MessagesFilter) (messages []* func (p *PMAPIProvider) getMessage(msgID string) (message *pmapi.Message, err error) { err = p.ensureConnection(func() error { + p.timeIt.start("download", msgID) + defer p.timeIt.stop("download", msgID) + message, err = p.client().GetMessage(msgID) return err }) return } -func (p *PMAPIProvider) importRequest(req []*pmapi.ImportMsgReq) (res []*pmapi.ImportMsgRes, err error) { +func (p *PMAPIProvider) importRequest(msgSourceID string, req []*pmapi.ImportMsgReq) (res []*pmapi.ImportMsgRes, err error) { err = p.ensureConnection(func() error { + p.timeIt.start("upload", msgSourceID) + defer p.timeIt.stop("upload", msgSourceID) + res, err = p.client().Import(req) return err }) return } -func (p *PMAPIProvider) createDraft(message *pmapi.Message, parent string, action int) (draft *pmapi.Message, err error) { +func (p *PMAPIProvider) createDraft(msgSourceID string, message *pmapi.Message, parent string, action int) (draft *pmapi.Message, err error) { err = p.ensureConnection(func() error { + p.timeIt.start("upload", msgSourceID) + defer p.timeIt.stop("upload", msgSourceID) + draft, err = p.client().CreateDraft(message, parent, action) return err }) return } -func (p *PMAPIProvider) createAttachment(att *pmapi.Attachment, r io.Reader, sig io.Reader) (created *pmapi.Attachment, err error) { +func (p *PMAPIProvider) createAttachment(msgSourceID string, att *pmapi.Attachment, r io.Reader, sig io.Reader) (created *pmapi.Attachment, err error) { err = p.ensureConnection(func() error { + p.timeIt.start("upload", msgSourceID) + defer p.timeIt.stop("upload", msgSourceID) + created, err = p.client().CreateAttachment(att, r, sig) return err }) diff --git a/internal/transfer/timeit.go b/internal/transfer/timeit.go new file mode 100644 index 00000000..23bfd4e1 --- /dev/null +++ b/internal/transfer/timeit.go @@ -0,0 +1,80 @@ +// Copyright (c) 2020 Proton Technologies AG +// +// This file is part of ProtonMail Bridge. +// +// ProtonMail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// ProtonMail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with ProtonMail Bridge. If not, see . + +package transfer + +import ( + "sync" + "time" +) + +type timeIt struct { + lock sync.Locker + name string + groups map[string]int64 + ongoing map[string]time.Time +} + +func newTimeIt(name string) *timeIt { + return &timeIt{ + lock: &sync.Mutex{}, + name: name, + groups: map[string]int64{}, + ongoing: map[string]time.Time{}, + } +} + +func (t *timeIt) clear() { + t.lock.Lock() + defer t.lock.Unlock() + + t.groups = map[string]int64{} + t.ongoing = map[string]time.Time{} +} + +func (t *timeIt) start(group, id string) { + t.lock.Lock() + defer t.lock.Unlock() + + t.ongoing[group+"/"+id] = time.Now() +} + +func (t *timeIt) stop(group, id string) { + endTime := time.Now() + + t.lock.Lock() + defer t.lock.Unlock() + + startTime, ok := t.ongoing[group+"/"+id] + if !ok { + log.WithField("group", group).WithField("id", id).Error("Stop called before start") + return + } + delete(t.ongoing, group+"/"+id) + + diff := endTime.Sub(startTime).Milliseconds() + t.groups[group] += diff +} + +func (t *timeIt) logResults() { + t.lock.Lock() + defer t.lock.Unlock() + + // Print also ongoing to be sure that nothing was left out. + // Basically ongoing should be empty. + log.WithField("name", t.name).WithField("result", t.groups).WithField("ongoing", t.ongoing).Debug("Time measurement") +} diff --git a/internal/transfer/transfer.go b/internal/transfer/transfer.go index 7ae8812b..1bf6f7ef 100644 --- a/internal/transfer/transfer.go +++ b/internal/transfer/transfer.go @@ -181,7 +181,10 @@ func (t *Transfer) Start() *Progress { reportFile := newFileReport(t.logDir, t.id) progress := newProgress(log, reportFile) - ch := make(chan Message) + // Small queue to prevent having idle source while target is blocked. + // E.g., when upload to PM is in progress, we can in meantime download + // the next batch from remote IMAP server. + ch := make(chan Message, 10) go func() { defer t.panicHandler.HandlePanic()