mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-14 06:26:43 +00:00
Import/Export backend prep
This commit is contained in:
210
internal/transfer/provider_imap_source.go
Normal file
210
internal/transfer/provider_imap_source.go
Normal file
@ -0,0 +1,210 @@
|
||||
// Copyright (c) 2020 Proton Technologies AG
|
||||
//
|
||||
// This file is part of ProtonMail Bridge.
|
||||
//
|
||||
// ProtonMail Bridge is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// ProtonMail Bridge is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with ProtonMail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
package transfer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/emersion/go-imap"
|
||||
)
|
||||
|
||||
type imapMessageInfo struct {
|
||||
id string
|
||||
uid uint32
|
||||
size uint32
|
||||
}
|
||||
|
||||
const (
|
||||
imapPageSize = uint32(2000) // Optimized on Gmail.
|
||||
imapMaxFetchSize = uint32(50 * 1000 * 1000) // Size in octets. If 0, it will use one fetch per message.
|
||||
)
|
||||
|
||||
// TransferTo exports messages based on rules to channel.
|
||||
func (p *IMAPProvider) TransferTo(rules transferRules, progress *Progress, ch chan<- Message) {
|
||||
log.Info("Started transfer from IMAP to channel")
|
||||
defer log.Info("Finished transfer from IMAP to channel")
|
||||
|
||||
imapMessageInfoMap := p.loadMessageInfoMap(rules, progress)
|
||||
|
||||
for rule := range rules.iterateActiveRules() {
|
||||
log.WithField("rule", rule).Debug("Processing rule")
|
||||
messagesInfo := imapMessageInfoMap[rule.SourceMailbox.Name]
|
||||
p.transferTo(rule, messagesInfo, progress, ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *IMAPProvider) loadMessageInfoMap(rules transferRules, progress *Progress) map[string]map[string]imapMessageInfo {
|
||||
res := map[string]map[string]imapMessageInfo{}
|
||||
|
||||
for rule := range rules.iterateActiveRules() {
|
||||
if progress.shouldStop() {
|
||||
break
|
||||
}
|
||||
|
||||
mailboxName := rule.SourceMailbox.Name
|
||||
var mailbox *imap.MailboxStatus
|
||||
progress.callWrap(func() error {
|
||||
var err error
|
||||
mailbox, err = p.selectIn(mailboxName)
|
||||
return err
|
||||
})
|
||||
if mailbox.Messages == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
messagesInfo := p.loadMessagesInfo(rule, progress, mailbox.UidValidity)
|
||||
res[rule.SourceMailbox.Name] = messagesInfo
|
||||
progress.updateCount(rule.SourceMailbox.Name, uint(len(messagesInfo)))
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValidity uint32) map[string]imapMessageInfo {
|
||||
messagesInfo := map[string]imapMessageInfo{}
|
||||
|
||||
pageStart := uint32(1)
|
||||
pageEnd := imapPageSize
|
||||
for {
|
||||
if progress.shouldStop() {
|
||||
break
|
||||
}
|
||||
|
||||
seqSet := &imap.SeqSet{}
|
||||
seqSet.AddRange(pageStart, pageEnd)
|
||||
|
||||
items := []imap.FetchItem{imap.FetchUid, imap.FetchRFC822Size}
|
||||
if rule.HasTimeLimit() {
|
||||
items = append(items, imap.FetchEnvelope)
|
||||
}
|
||||
|
||||
pageMsgCount := uint32(0)
|
||||
processMessageCallback := func(imapMessage *imap.Message) {
|
||||
pageMsgCount++
|
||||
if rule.HasTimeLimit() {
|
||||
t := imapMessage.Envelope.Date.Unix()
|
||||
if t != 0 && !rule.isTimeInRange(t) {
|
||||
log.WithField("uid", imapMessage.Uid).Debug("Message skipped due to time")
|
||||
return
|
||||
}
|
||||
}
|
||||
id := fmt.Sprintf("%s_%d:%d", rule.SourceMailbox.Name, uidValidity, imapMessage.Uid)
|
||||
messagesInfo[id] = imapMessageInfo{
|
||||
id: id,
|
||||
uid: imapMessage.Uid,
|
||||
size: imapMessage.Size,
|
||||
}
|
||||
progress.addMessage(id, rule)
|
||||
}
|
||||
|
||||
progress.callWrap(func() error {
|
||||
return p.fetch(seqSet, items, processMessageCallback)
|
||||
})
|
||||
|
||||
if pageMsgCount < imapPageSize {
|
||||
break
|
||||
}
|
||||
|
||||
pageStart = pageEnd
|
||||
pageEnd += imapPageSize
|
||||
}
|
||||
|
||||
return messagesInfo
|
||||
}
|
||||
|
||||
func (p *IMAPProvider) transferTo(rule *Rule, messagesInfo map[string]imapMessageInfo, progress *Progress, ch chan<- Message) {
|
||||
progress.callWrap(func() error {
|
||||
_, err := p.selectIn(rule.SourceMailbox.Name)
|
||||
return err
|
||||
})
|
||||
|
||||
seqSet := &imap.SeqSet{}
|
||||
seqSetSize := uint32(0)
|
||||
uidToID := map[uint32]string{}
|
||||
|
||||
for _, messageInfo := range messagesInfo {
|
||||
if progress.shouldStop() {
|
||||
break
|
||||
}
|
||||
|
||||
if seqSetSize != 0 && (seqSetSize+messageInfo.size) > imapMaxFetchSize {
|
||||
log.WithField("mailbox", rule.SourceMailbox.Name).WithField("seq", seqSet).WithField("size", seqSetSize).Debug("Fetching messages")
|
||||
|
||||
p.exportMessages(rule, progress, ch, seqSet, uidToID)
|
||||
|
||||
seqSet = &imap.SeqSet{}
|
||||
seqSetSize = 0
|
||||
uidToID = map[uint32]string{}
|
||||
}
|
||||
|
||||
seqSet.AddNum(messageInfo.uid)
|
||||
seqSetSize += messageInfo.size
|
||||
uidToID[messageInfo.uid] = messageInfo.id
|
||||
}
|
||||
}
|
||||
|
||||
func (p *IMAPProvider) exportMessages(rule *Rule, progress *Progress, ch chan<- Message, seqSet *imap.SeqSet, uidToID map[uint32]string) {
|
||||
section := &imap.BodySectionName{}
|
||||
items := []imap.FetchItem{imap.FetchUid, imap.FetchFlags, section.FetchItem()}
|
||||
|
||||
processMessageCallback := func(imapMessage *imap.Message) {
|
||||
id, ok := uidToID[imapMessage.Uid]
|
||||
|
||||
// Sometimes, server sends not requested messages.
|
||||
if !ok {
|
||||
log.WithField("uid", imapMessage.Uid).Warning("Message skipped: not requested")
|
||||
return
|
||||
}
|
||||
|
||||
// Sometimes, server sends message twice, once with body and once without it.
|
||||
bodyReader := imapMessage.GetBody(section)
|
||||
if bodyReader == nil {
|
||||
log.WithField("uid", imapMessage.Uid).Warning("Message skipped: no body")
|
||||
return
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(bodyReader)
|
||||
progress.messageExported(id, body, err)
|
||||
if err == nil {
|
||||
msg := p.exportMessage(rule, id, imapMessage, body)
|
||||
ch <- msg
|
||||
}
|
||||
}
|
||||
|
||||
progress.callWrap(func() error {
|
||||
return p.uidFetch(seqSet, items, processMessageCallback)
|
||||
})
|
||||
}
|
||||
|
||||
func (p *IMAPProvider) exportMessage(rule *Rule, id string, imapMessage *imap.Message, body []byte) Message {
|
||||
unread := true
|
||||
for _, flag := range imapMessage.Flags {
|
||||
if flag == imap.SeenFlag {
|
||||
unread = false
|
||||
}
|
||||
}
|
||||
|
||||
return Message{
|
||||
ID: id,
|
||||
Unread: unread,
|
||||
Body: body,
|
||||
Source: rule.SourceMailbox,
|
||||
Targets: rule.TargetMailboxes,
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user