Try load messages one-by-one

This commit is contained in:
Michal Horejsek
2020-11-06 14:23:38 +01:00
parent 70645c1732
commit ba65494fce
8 changed files with 393 additions and 89 deletions

View File

@ -84,12 +84,37 @@ func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValid
p.timeIt.start("load", rule.SourceMailbox.Name)
defer p.timeIt.stop("load", rule.SourceMailbox.Name)
log := log.WithField("mailbox", rule.SourceMailbox.Name)
messagesInfo := map[string]imapMessageInfo{}
fetchItems := []imap.FetchItem{imap.FetchUid, imap.FetchRFC822Size}
if rule.HasTimeLimit() {
fetchItems = append(fetchItems, imap.FetchEnvelope)
}
processMessageCallback := func(imapMessage *imap.Message) {
if rule.HasTimeLimit() {
t := imapMessage.Envelope.Date.Unix()
if t != 0 && !rule.isTimeInRange(t) {
log.WithField("uid", imapMessage.Uid).Debug("Message skipped due to time")
return
}
}
id := getUniqueMessageID(rule.SourceMailbox.Name, uidValidity, imapMessage.Uid)
// We use ID as key to ensure we have every unique message only once.
// Some IMAP servers responded twice the same message...
messagesInfo[id] = imapMessageInfo{
id: id,
uid: imapMessage.Uid,
size: imapMessage.Size,
}
progress.addMessage(id, []string{rule.SourceMailbox.Name}, rule.TargetMailboxNames())
}
pageStart := uint32(1)
pageEnd := imapPageSize
for {
if progress.shouldStop() {
if progress.shouldStop() || pageStart > count {
break
}
@ -100,45 +125,21 @@ func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValid
seqSet := &imap.SeqSet{}
seqSet.AddRange(pageStart, pageEnd)
items := []imap.FetchItem{imap.FetchUid, imap.FetchRFC822Size}
if rule.HasTimeLimit() {
items = append(items, imap.FetchEnvelope)
}
pageMsgCount := uint32(0)
processMessageCallback := func(imapMessage *imap.Message) {
pageMsgCount++
if rule.HasTimeLimit() {
t := imapMessage.Envelope.Date.Unix()
if t != 0 && !rule.isTimeInRange(t) {
log.WithField("uid", imapMessage.Uid).Debug("Message skipped due to time")
return
err := p.fetch(rule.SourceMailbox.Name, seqSet, fetchItems, processMessageCallback)
if err != nil {
log.WithError(err).WithField("idx", seqSet).Warning("Load batch fetch failed, trying one by one")
for ; pageStart <= pageEnd; pageStart++ {
seqSet := &imap.SeqSet{}
seqSet.AddNum(pageStart)
if err := p.fetch(rule.SourceMailbox.Name, seqSet, fetchItems, processMessageCallback); err != nil {
log.WithError(err).WithField("idx", seqSet).Warning("Load fetch failed, skipping the message")
}
}
id := getUniqueMessageID(rule.SourceMailbox.Name, uidValidity, imapMessage.Uid)
// We use ID as key to ensure we have every unique message only once.
// Some IMAP servers responded twice the same message...
messagesInfo[id] = imapMessageInfo{
id: id,
uid: imapMessage.Uid,
size: imapMessage.Size,
}
progress.addMessage(id, []string{rule.SourceMailbox.Name}, rule.TargetMailboxNames())
}
progress.callWrap(func() error {
return p.fetch(rule.SourceMailbox.Name, seqSet, items, processMessageCallback)
})
if pageMsgCount < imapPageSize {
break
}
pageStart = pageEnd
pageStart = pageEnd + 1
pageEnd += imapPageSize
}
return messagesInfo
}