Import/Export backend

This commit is contained in:
Michal Horejsek
2020-06-17 15:29:41 +02:00
parent 49316a935c
commit 1c10cc5065
107 changed files with 2869 additions and 743 deletions

View File

@ -41,13 +41,9 @@ func (m Mailbox) Hash() string {
// LeastUsedColor is intended to return color for creating a new inbox or label
func LeastUsedColor(mailboxes []Mailbox) string {
usedColors := []string{}
if mailboxes != nil {
for _, m := range mailboxes {
usedColors = append(usedColors, m.Color)
}
for _, m := range mailboxes {
usedColors = append(usedColors, m.Color)
}
return pmapi.LeastUsedColor(usedColors)
}

View File

@ -56,6 +56,10 @@ type MessageStatus struct {
Time time.Time
}
func (status *MessageStatus) String() string {
return fmt.Sprintf("%s (%s, %s, %s): %s", status.SourceID, status.Subject, status.From, status.Time, status.GetErrorMessage())
}
func (status *MessageStatus) setDetailsFromHeader(header mail.Header) {
dec := &mime.WordDecoder{}

View File

@ -139,8 +139,10 @@ func (p *Progress) messageExported(messageID string, body []byte, err error) {
p.log.WithField("id", messageID).WithError(err).Debug("Message exported")
status := p.messageStatuses[messageID]
status.exported = true
status.exportErr = err
if err == nil {
status.exported = true
}
if len(body) > 0 {
status.bodyHash = fmt.Sprintf("%x", sha256.Sum256(body))
@ -166,8 +168,10 @@ func (p *Progress) messageImported(messageID, importID string, err error) {
p.log.WithField("id", messageID).WithError(err).Debug("Message imported")
p.messageStatuses[messageID].targetID = importID
p.messageStatuses[messageID].imported = true
p.messageStatuses[messageID].importErr = err
if err == nil {
p.messageStatuses[messageID].imported = true
}
// Import is the last step, now we can log the result to the report file.
p.logMessage(messageID)

View File

@ -73,8 +73,8 @@ func TestProgressAddingMessages(t *testing.T) {
failed, imported, exported, added, _ := progress.GetCounts()
a.Equal(t, uint(4), added)
a.Equal(t, uint(4), exported)
a.Equal(t, uint(3), imported)
a.Equal(t, uint(2), exported)
a.Equal(t, uint(2), imported)
a.Equal(t, uint(3), failed)
errorsMap := map[string]string{}

View File

@ -68,7 +68,7 @@ func (p *IMAPProvider) loadMessageInfoMap(rules transferRules, progress *Progres
continue
}
messagesInfo := p.loadMessagesInfo(rule, progress, mailbox.UidValidity)
messagesInfo := p.loadMessagesInfo(rule, progress, mailbox.UidValidity, mailbox.Messages)
res[rule.SourceMailbox.Name] = messagesInfo
progress.updateCount(rule.SourceMailbox.Name, uint(len(messagesInfo)))
}
@ -76,7 +76,7 @@ func (p *IMAPProvider) loadMessageInfoMap(rules transferRules, progress *Progres
return res
}
func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValidity uint32) map[string]imapMessageInfo {
func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValidity, count uint32) map[string]imapMessageInfo {
messagesInfo := map[string]imapMessageInfo{}
pageStart := uint32(1)
@ -86,6 +86,11 @@ func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValid
break
}
// Some servers do not accept message sequence number higher than the total count.
if pageEnd > count {
pageEnd = count
}
seqSet := &imap.SeqSet{}
seqSet.AddRange(pageStart, pageEnd)
@ -114,7 +119,7 @@ func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValid
}
progress.callWrap(func() error {
return p.fetch(seqSet, items, processMessageCallback)
return p.fetch(rule.SourceMailbox.Name, seqSet, items, processMessageCallback)
})
if pageMsgCount < imapPageSize {
@ -145,7 +150,6 @@ func (p *IMAPProvider) transferTo(rule *Rule, messagesInfo map[string]imapMessag
if seqSetSize != 0 && (seqSetSize+messageInfo.size) > imapMaxFetchSize {
log.WithField("mailbox", rule.SourceMailbox.Name).WithField("seq", seqSet).WithField("size", seqSetSize).Debug("Fetching messages")
p.exportMessages(rule, progress, ch, seqSet, uidToID)
seqSet = &imap.SeqSet{}
@ -157,6 +161,11 @@ func (p *IMAPProvider) transferTo(rule *Rule, messagesInfo map[string]imapMessag
seqSetSize += messageInfo.size
uidToID[messageInfo.uid] = messageInfo.id
}
if len(uidToID) != 0 {
log.WithField("mailbox", rule.SourceMailbox.Name).WithField("seq", seqSet).WithField("size", seqSetSize).Debug("Fetching messages")
p.exportMessages(rule, progress, ch, seqSet, uidToID)
}
}
func (p *IMAPProvider) exportMessages(rule *Rule, progress *Progress, ch chan<- Message, seqSet *imap.SeqSet, uidToID map[uint32]string) {
@ -188,7 +197,7 @@ func (p *IMAPProvider) exportMessages(rule *Rule, progress *Progress, ch chan<-
}
progress.callWrap(func() error {
return p.uidFetch(seqSet, items, processMessageCallback)
return p.uidFetch(rule.SourceMailbox.Name, seqSet, items, processMessageCallback)
})
}

View File

@ -49,16 +49,11 @@ func (l *imapErrorLogger) Println(v ...interface{}) {
l.log.Errorln(v...)
}
type imapDebugLogger struct { //nolint[unused]
log *logrus.Entry
}
func (l *imapDebugLogger) Write(data []byte) (int, error) {
l.log.Trace(string(data))
return len(data), nil
}
func (p *IMAPProvider) ensureConnection(callback func() error) error {
return p.ensureConnectionAndSelection(callback, "")
}
func (p *IMAPProvider) ensureConnectionAndSelection(callback func() error, ensureSelectedIn string) error {
var callErr error
for i := 1; i <= imapRetries; i++ {
callErr = callback()
@ -66,8 +61,8 @@ func (p *IMAPProvider) ensureConnection(callback func() error) error {
return nil
}
log.WithField("attempt", i).WithError(callErr).Warning("Call failed, trying reconnect")
err := p.tryReconnect()
log.WithField("attempt", i).WithError(callErr).Warning("IMAP call failed, trying reconnect")
err := p.tryReconnect(ensureSelectedIn)
if err != nil {
return err
}
@ -75,7 +70,7 @@ func (p *IMAPProvider) ensureConnection(callback func() error) error {
return errors.Wrap(callErr, "too many retries")
}
func (p *IMAPProvider) tryReconnect() error {
func (p *IMAPProvider) tryReconnect(ensureSelectedIn string) error {
start := time.Now()
var previousErr error
for {
@ -84,6 +79,7 @@ func (p *IMAPProvider) tryReconnect() error {
}
err := pmapi.CheckConnection()
log.WithError(err).Debug("Connection check")
if err != nil {
time.Sleep(imapReconnectSleep)
previousErr = err
@ -91,24 +87,47 @@ func (p *IMAPProvider) tryReconnect() error {
}
err = p.reauth()
log.WithError(err).Debug("Reauth")
if err != nil {
time.Sleep(imapReconnectSleep)
previousErr = err
continue
}
if ensureSelectedIn != "" {
_, err = p.client.Select(ensureSelectedIn, true)
log.WithError(err).Debug("Reselect")
if err != nil {
previousErr = err
continue
}
}
break
}
return nil
}
func (p *IMAPProvider) reauth() error {
if _, err := p.client.Capability(); err != nil {
state := p.client.State()
log.WithField("addr", p.addr).WithField("state", state).WithError(err).Debug("Reconnecting")
p.client = nil
var state imap.ConnState
// In some cases once go-imap fails, we cannot issue another command
// because it would dead-lock. Let's simply ignore it, we want to open
// new connection anyway.
ch := make(chan struct{})
go func() {
defer close(ch)
if _, err := p.client.Capability(); err != nil {
state = p.client.State()
}
}()
select {
case <-ch:
case <-time.After(30 * time.Second):
}
log.WithField("addr", p.addr).WithField("state", state).Debug("Reconnecting")
p.client = nil
return p.auth()
}
@ -121,15 +140,25 @@ func (p *IMAPProvider) auth() error { //nolint[funlen]
return errors.Wrap(err, "failed to dial server")
}
client, err := imapClient.DialTLS(p.addr, nil)
var client *imapClient.Client
var err error
host, _, _ := net.SplitHostPort(p.addr)
if host == "127.0.0.1" {
client, err = imapClient.Dial(p.addr)
} else {
client, err = imapClient.DialTLS(p.addr, nil)
}
if err != nil {
return errors.Wrap(err, "failed to connect to server")
}
client.ErrorLog = &imapErrorLogger{logrus.WithField("pkg", "imap-client")}
// Logrus have Writer helper but it fails for big messages because of
// bufio.MaxScanTokenSize limit.
// This spams a lot, uncomment once needed during development.
//client.SetDebug(&imapDebugLogger{logrus.WithField("pkg", "imap-client")})
// Logrus `WriterLevel` fails for big messages because of bufio.MaxScanTokenSize limit.
// Also, this spams a lot, uncomment once needed during development.
//client.SetDebug(imap.NewDebugWriter(
// logrus.WithField("pkg", "imap/client").WriterLevel(logrus.TraceLevel),
// logrus.WithField("pkg", "imap/server").WriterLevel(logrus.TraceLevel),
//))
p.client = client
log.Info("Connected")
@ -205,16 +234,16 @@ func (p *IMAPProvider) selectIn(mailboxName string) (mailbox *imap.MailboxStatus
return
}
func (p *IMAPProvider) fetch(seqSet *imap.SeqSet, items []imap.FetchItem, processMessageCallback func(m *imap.Message)) error {
return p.fetchHelper(false, seqSet, items, processMessageCallback)
func (p *IMAPProvider) fetch(ensureSelectedIn string, seqSet *imap.SeqSet, items []imap.FetchItem, processMessageCallback func(m *imap.Message)) error {
return p.fetchHelper(false, ensureSelectedIn, seqSet, items, processMessageCallback)
}
func (p *IMAPProvider) uidFetch(seqSet *imap.SeqSet, items []imap.FetchItem, processMessageCallback func(m *imap.Message)) error {
return p.fetchHelper(true, seqSet, items, processMessageCallback)
func (p *IMAPProvider) uidFetch(ensureSelectedIn string, seqSet *imap.SeqSet, items []imap.FetchItem, processMessageCallback func(m *imap.Message)) error {
return p.fetchHelper(true, ensureSelectedIn, seqSet, items, processMessageCallback)
}
func (p *IMAPProvider) fetchHelper(uid bool, seqSet *imap.SeqSet, items []imap.FetchItem, processMessageCallback func(m *imap.Message)) error {
return p.ensureConnection(func() error {
func (p *IMAPProvider) fetchHelper(uid bool, ensureSelectedIn string, seqSet *imap.SeqSet, items []imap.FetchItem, processMessageCallback func(m *imap.Message)) error {
return p.ensureConnectionAndSelection(func() error {
messagesCh := make(chan *imap.Message)
doneCh := make(chan error)
@ -232,5 +261,5 @@ func (p *IMAPProvider) fetchHelper(uid bool, seqSet *imap.SeqSet, items []imap.F
err := <-doneCh
return err
})
}, ensureSelectedIn)
}

View File

@ -45,7 +45,7 @@ func (p *MBOXProvider) TransferFrom(rules transferRules, progress *Progress, ch
defer log.Info("Finished transfer from channel to MBOX")
for msg := range ch {
for progress.shouldStop() {
if progress.shouldStop() {
break
}

View File

@ -20,7 +20,7 @@ package transfer
import (
"sort"
"github.com/ProtonMail/gopenpgp/crypto"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/ProtonMail/proton-bridge/pkg/pmapi"
"github.com/pkg/errors"
)
@ -31,6 +31,9 @@ type PMAPIProvider struct {
userID string
addressID string
keyRing *crypto.KeyRing
importMsgReqMap map[string]*pmapi.ImportMsgReq // Key is msg transfer ID.
importMsgReqSize int
}
// NewPMAPIProvider returns new PMAPIProvider.
@ -45,6 +48,9 @@ func NewPMAPIProvider(clientManager ClientManager, userID, addressID string) (*P
userID: userID,
addressID: addressID,
keyRing: keyRing,
importMsgReqMap: map[string]*pmapi.ImportMsgReq{},
importMsgReqSize: 0,
}, nil
}

View File

@ -19,6 +19,7 @@ package transfer
import (
"fmt"
"sync"
pkgMessage "github.com/ProtonMail/proton-bridge/pkg/message"
"github.com/ProtonMail/proton-bridge/pkg/pmapi"
@ -32,11 +33,21 @@ func (p *PMAPIProvider) TransferTo(rules transferRules, progress *Progress, ch c
log.Info("Started transfer from PMAPI to channel")
defer log.Info("Finished transfer from PMAPI to channel")
go p.loadCounts(rules, progress)
// TransferTo cannot end sooner than loadCounts goroutine because
// loadCounts writes to channel in progress which would be closed.
// That can happen for really small accounts.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
p.loadCounts(rules, progress)
}()
for rule := range rules.iterateActiveRules() {
p.transferTo(rule, progress, ch, rules.skipEncryptedMessages)
}
wg.Wait()
}
func (p *PMAPIProvider) loadCounts(rules transferRules, progress *Progress) {

View File

@ -28,6 +28,11 @@ import (
"github.com/pkg/errors"
)
const (
pmapiImportBatchMaxItems = 10
pmapiImportBatchMaxSize = 25 * 1000 * 1000 // 25 MB
)
// DefaultMailboxes returns the default mailboxes for default rules if no other is found.
func (p *PMAPIProvider) DefaultMailboxes(_ Mailbox) []Mailbox {
return []Mailbox{{
@ -67,18 +72,19 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch
defer log.Info("Finished transfer from channel to PMAPI")
for msg := range ch {
for progress.shouldStop() {
if progress.shouldStop() {
break
}
var importedID string
var err error
if p.isMessageDraft(msg) {
importedID, err = p.importDraft(msg, rules.globalMailbox)
p.transferDraft(rules, progress, msg)
} else {
importedID, err = p.importMessage(msg, rules.globalMailbox)
p.transferMessage(rules, progress, msg)
}
progress.messageImported(msg.ID, importedID, err)
}
if len(p.importMsgReqMap) > 0 {
p.importMessages(progress)
}
}
@ -91,6 +97,11 @@ func (p *PMAPIProvider) isMessageDraft(msg Message) bool {
return false
}
func (p *PMAPIProvider) transferDraft(rules transferRules, progress *Progress, msg Message) {
importedID, err := p.importDraft(msg, rules.globalMailbox)
progress.messageImported(msg.ID, importedID, err)
}
func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string, error) {
message, attachmentReaders, err := p.parseMessage(msg)
if err != nil {
@ -138,15 +149,30 @@ func (p *PMAPIProvider) importDraft(msg Message, globalMailbox *Mailbox) (string
return draft.ID, nil
}
func (p *PMAPIProvider) importMessage(msg Message, globalMailbox *Mailbox) (string, error) {
func (p *PMAPIProvider) transferMessage(rules transferRules, progress *Progress, msg Message) {
importMsgReq, err := p.generateImportMsgReq(msg, rules.globalMailbox)
if err != nil {
progress.messageImported(msg.ID, "", err)
return
}
importMsgReqSize := len(importMsgReq.Body)
if p.importMsgReqSize+importMsgReqSize > pmapiImportBatchMaxSize || len(p.importMsgReqMap) == pmapiImportBatchMaxItems {
p.importMessages(progress)
}
p.importMsgReqMap[msg.ID] = importMsgReq
p.importMsgReqSize += importMsgReqSize
}
func (p *PMAPIProvider) generateImportMsgReq(msg Message, globalMailbox *Mailbox) (*pmapi.ImportMsgReq, error) {
message, attachmentReaders, err := p.parseMessage(msg)
if err != nil {
return "", errors.Wrap(err, "failed to parse message")
return nil, errors.Wrap(err, "failed to parse message")
}
body, err := p.encryptMessage(message, attachmentReaders)
if err != nil {
return "", errors.Wrap(err, "failed to encrypt message")
return nil, errors.Wrap(err, "failed to encrypt message")
}
unread := 0
@ -165,26 +191,14 @@ func (p *PMAPIProvider) importMessage(msg Message, globalMailbox *Mailbox) (stri
labelIDs = append(labelIDs, globalMailbox.ID)
}
importMsgReq := &pmapi.ImportMsgReq{
return &pmapi.ImportMsgReq{
AddressID: p.addressID,
Body: body,
Unread: unread,
Time: message.Time,
Flags: computeMessageFlags(labelIDs),
LabelIDs: labelIDs,
}
results, err := p.importRequest([]*pmapi.ImportMsgReq{importMsgReq})
if err != nil {
return "", errors.Wrap(err, "failed to import messages")
}
if len(results) == 0 {
return "", errors.New("import ended with no result")
}
if results[0].Error != nil {
return "", errors.Wrap(results[0].Error, "failed to import message")
}
return results[0].MessageID, nil
}, nil
}
func (p *PMAPIProvider) parseMessage(msg Message) (*pmapi.Message, []io.Reader, error) {
@ -218,3 +232,65 @@ func computeMessageFlags(labels []string) (flag int64) {
return flag
}
func (p *PMAPIProvider) importMessages(progress *Progress) {
if progress.shouldStop() {
return
}
importMsgIDs := []string{}
importMsgRequests := []*pmapi.ImportMsgReq{}
for msgID, req := range p.importMsgReqMap {
importMsgIDs = append(importMsgIDs, msgID)
importMsgRequests = append(importMsgRequests, req)
}
log.WithField("msgIDs", importMsgIDs).WithField("size", p.importMsgReqSize).Debug("Importing messages")
results, err := p.importRequest(importMsgRequests)
// In case the whole request failed, try to import every message one by one.
if err != nil || len(results) == 0 {
log.WithError(err).Warning("Importing messages failed, trying one by one")
for msgID, req := range p.importMsgReqMap {
importedID, err := p.importMessage(progress, req)
progress.messageImported(msgID, importedID, err)
}
return
}
// In case request passed but some messages failed, try to import the failed ones alone.
for index, result := range results {
msgID := importMsgIDs[index]
if result.Error != nil {
log.WithError(result.Error).WithField("msg", msgID).Warning("Importing message failed, trying alone")
req := importMsgRequests[index]
importedID, err := p.importMessage(progress, req)
progress.messageImported(msgID, importedID, err)
} else {
progress.messageImported(msgID, result.MessageID, nil)
}
}
p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{}
p.importMsgReqSize = 0
}
func (p *PMAPIProvider) importMessage(progress *Progress, req *pmapi.ImportMsgReq) (importedID string, importedErr error) {
progress.callWrap(func() error {
results, err := p.importRequest([]*pmapi.ImportMsgReq{req})
if err != nil {
return errors.Wrap(err, "failed to import messages")
}
if len(results) == 0 {
importedErr = errors.New("import ended with no result")
return nil // This should not happen, only when there is bug which means we should skip this one.
}
if results[0].Error != nil {
importedErr = errors.Wrap(results[0].Error, "failed to import message")
return nil // Call passed but API refused this message, skip this one.
}
importedID = results[0].MessageID
return nil
})
return
}

View File

@ -178,17 +178,16 @@ func setupPMAPIClientExpectationForExport(m *mocks) {
func setupPMAPIClientExpectationForImport(m *mocks) {
m.pmapiClient.EXPECT().KeyRingForAddressID(gomock.Any()).Return(m.keyring, nil).AnyTimes()
m.pmapiClient.EXPECT().Import(gomock.Any()).DoAndReturn(func(requests []*pmapi.ImportMsgReq) ([]*pmapi.ImportMsgRes, error) {
r.Equal(m.t, 1, len(requests))
request := requests[0]
for _, msgID := range []string{"msg1", "msg2"} {
if bytes.Contains(request.Body, []byte(msgID)) {
return []*pmapi.ImportMsgRes{{MessageID: msgID, Error: nil}}, nil
results := []*pmapi.ImportMsgRes{}
for _, request := range requests {
for _, msgID := range []string{"msg1", "msg2"} {
if bytes.Contains(request.Body, []byte(msgID)) {
results = append(results, &pmapi.ImportMsgRes{MessageID: msgID, Error: nil})
}
}
}
r.Fail(m.t, "No message found")
return nil, nil
}).Times(2)
return results, nil
}).AnyTimes()
}
func setupPMAPIClientExpectationForImportDraft(m *mocks) {

View File

@ -39,7 +39,7 @@ func (p *PMAPIProvider) ensureConnection(callback func() error) error {
return nil
}
log.WithField("attempt", i).WithError(callErr).Warning("Call failed, trying reconnect")
log.WithField("attempt", i).WithError(callErr).Warning("API call failed, trying reconnect")
err := p.tryReconnect()
if err != nil {
return err
@ -57,6 +57,7 @@ func (p *PMAPIProvider) tryReconnect() error {
}
err := p.clientManager.CheckConnection()
log.WithError(err).Debug("Connection check")
if err != nil {
time.Sleep(pmapiReconnectSleep)
previousErr = err

View File

@ -18,11 +18,10 @@
package transfer
import (
"bytes"
"io/ioutil"
"testing"
"github.com/ProtonMail/gopenpgp/crypto"
"github.com/ProtonMail/gopenpgp/v2/crypto"
transfermocks "github.com/ProtonMail/proton-bridge/internal/transfer/mocks"
pmapimocks "github.com/ProtonMail/proton-bridge/pkg/pmapi/mocks"
gomock "github.com/golang/mock/gomock"
@ -62,11 +61,12 @@ func newTestKeyring() *crypto.KeyRing {
if err != nil {
panic(err)
}
userKey, err := crypto.ReadArmoredKeyRing(bytes.NewReader(data))
key, err := crypto.NewKeyFromArmored(string(data))
if err != nil {
panic(err)
}
if err := userKey.Unlock([]byte("testpassphrase")); err != nil {
userKey, err := crypto.NewKeyRing(key)
if err != nil {
panic(err)
}
return userKey