forked from Silverfish/proton-bridge
Import/Export final touches
This commit is contained in:
@ -25,6 +25,25 @@ import (
|
||||
"github.com/ProtonMail/proton-bridge/pkg/pmapi"
|
||||
)
|
||||
|
||||
var systemFolderMapping = map[string]string{ //nolint[gochecknoglobals]
|
||||
"bin": "Trash",
|
||||
"junk": "Spam",
|
||||
"all": "All Mail",
|
||||
"sent mail": "Sent",
|
||||
"draft": "Drafts",
|
||||
"important": "Starred",
|
||||
// Add more translations.
|
||||
}
|
||||
|
||||
// LeastUsedColor is intended to return color for creating a new inbox or label
|
||||
func LeastUsedColor(mailboxes []Mailbox) string {
|
||||
usedColors := []string{}
|
||||
for _, m := range mailboxes {
|
||||
usedColors = append(usedColors, m.Color)
|
||||
}
|
||||
return pmapi.LeastUsedColor(usedColors)
|
||||
}
|
||||
|
||||
// Mailbox is universal data holder of mailbox details for every provider.
|
||||
type Mailbox struct {
|
||||
ID string
|
||||
@ -43,28 +62,10 @@ func (m Mailbox) Hash() string {
|
||||
return fmt.Sprintf("%x", sha256.Sum256([]byte(m.Name)))
|
||||
}
|
||||
|
||||
// LeastUsedColor is intended to return color for creating a new inbox or label
|
||||
func LeastUsedColor(mailboxes []Mailbox) string {
|
||||
usedColors := []string{}
|
||||
for _, m := range mailboxes {
|
||||
usedColors = append(usedColors, m.Color)
|
||||
}
|
||||
return pmapi.LeastUsedColor(usedColors)
|
||||
}
|
||||
|
||||
// findMatchingMailboxes returns all matching mailboxes from `mailboxes`.
|
||||
// Only one exclusive mailbox is returned.
|
||||
// Only one exclusive mailbox is included.
|
||||
func (m Mailbox) findMatchingMailboxes(mailboxes []Mailbox) []Mailbox {
|
||||
nameVariants := []string{}
|
||||
if strings.Contains(m.Name, "/") || strings.Contains(m.Name, "|") {
|
||||
for _, slashPart := range strings.Split(m.Name, "/") {
|
||||
for _, part := range strings.Split(slashPart, "|") {
|
||||
nameVariants = append(nameVariants, strings.ToLower(part))
|
||||
}
|
||||
}
|
||||
}
|
||||
nameVariants = append(nameVariants, strings.ToLower(m.Name))
|
||||
|
||||
nameVariants := m.nameVariants()
|
||||
isExclusiveIncluded := false
|
||||
matches := []Mailbox{}
|
||||
for i := range nameVariants {
|
||||
@ -83,3 +84,27 @@ func (m Mailbox) findMatchingMailboxes(mailboxes []Mailbox) []Mailbox {
|
||||
}
|
||||
return matches
|
||||
}
|
||||
|
||||
// nameVariants returns all possible variants of the mailbox name.
|
||||
// The best match (original name) is at the end of the slice.
|
||||
// Variants are all in lower case. Examples:
|
||||
// * Foo/bar -> [foo, bar, foo/bar]
|
||||
// * x/Bin -> [x, trash, bin, x/bin]
|
||||
// * a|b/c -> [a, b, c, a|b/c]
|
||||
func (m Mailbox) nameVariants() (nameVariants []string) {
|
||||
name := strings.ToLower(m.Name)
|
||||
if strings.Contains(name, "/") || strings.Contains(name, "|") {
|
||||
for _, slashPart := range strings.Split(name, "/") {
|
||||
for _, part := range strings.Split(slashPart, "|") {
|
||||
if mappedPart, ok := systemFolderMapping[part]; ok {
|
||||
nameVariants = append(nameVariants, strings.ToLower(mappedPart))
|
||||
}
|
||||
nameVariants = append(nameVariants, part)
|
||||
}
|
||||
}
|
||||
}
|
||||
if mappedName, ok := systemFolderMapping[name]; ok {
|
||||
nameVariants = append(nameVariants, strings.ToLower(mappedName))
|
||||
}
|
||||
return append(nameVariants, name)
|
||||
}
|
||||
|
||||
@ -66,6 +66,7 @@ func TestLeastUsedColor(t *testing.T) {
|
||||
}
|
||||
r.Equal(t, "#7569d1", LeastUsedColor(mailboxes))
|
||||
}
|
||||
|
||||
func TestFindMatchingMailboxes(t *testing.T) {
|
||||
mailboxes := []Mailbox{
|
||||
{Name: "Inbox", IsExclusive: true},
|
||||
@ -75,6 +76,8 @@ func TestFindMatchingMailboxes(t *testing.T) {
|
||||
{Name: "hello/world", IsExclusive: true},
|
||||
{Name: "Hello", IsExclusive: false},
|
||||
{Name: "WORLD", IsExclusive: true},
|
||||
{Name: "Trash", IsExclusive: true},
|
||||
{Name: "Drafts", IsExclusive: true},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
@ -88,6 +91,10 @@ func TestFindMatchingMailboxes(t *testing.T) {
|
||||
{"hello/world", []string{"hello/world", "Hello"}},
|
||||
{"hello|world", []string{"WORLD", "Hello"}},
|
||||
{"nomailbox", []string{}},
|
||||
{"bin", []string{"Trash"}},
|
||||
{"root/bin", []string{"Trash"}},
|
||||
{"draft", []string{"Drafts"}},
|
||||
{"root/draft", []string{"Drafts"}},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
|
||||
@ -30,11 +30,12 @@ import (
|
||||
// Import and export update progress about processing messages and progress
|
||||
// informs user interface, vice versa action (such as pause or resume) from
|
||||
// user interface is passed down to import and export.
|
||||
type Progress struct {
|
||||
type Progress struct { //nolint[maligned]
|
||||
log *logrus.Entry
|
||||
lock sync.RWMutex
|
||||
lock sync.Locker
|
||||
|
||||
updateCh chan struct{}
|
||||
messageCounted bool
|
||||
messageCounts map[string]uint
|
||||
messageStatuses map[string]*MessageStatus
|
||||
pauseReason string
|
||||
@ -45,7 +46,8 @@ type Progress struct {
|
||||
|
||||
func newProgress(log *logrus.Entry, fileReport *fileReport) Progress {
|
||||
return Progress{
|
||||
log: log,
|
||||
log: log,
|
||||
lock: &sync.Mutex{},
|
||||
|
||||
updateCh: make(chan struct{}),
|
||||
messageCounts: map[string]uint{},
|
||||
@ -57,11 +59,7 @@ func newProgress(log *logrus.Entry, fileReport *fileReport) Progress {
|
||||
// update is helper to notify listener for updates.
|
||||
func (p *Progress) update() {
|
||||
if p.updateCh == nil {
|
||||
// If the progress was ended by fatal instead finish, we ignore error.
|
||||
if p.fatalError != nil {
|
||||
return
|
||||
}
|
||||
panic("update should not be called after finish was called")
|
||||
return
|
||||
}
|
||||
|
||||
// In case no one listens for an update, do not block the progress.
|
||||
@ -71,17 +69,12 @@ func (p *Progress) update() {
|
||||
}
|
||||
}
|
||||
|
||||
// start should be called before anything starts.
|
||||
func (p *Progress) start() {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
}
|
||||
|
||||
// finish should be called as the last call once everything is done.
|
||||
func (p *Progress) finish() {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
log.Debug("Progress finished")
|
||||
p.cleanUpdateCh()
|
||||
}
|
||||
|
||||
@ -90,6 +83,7 @@ func (p *Progress) fatal(err error) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
log.WithError(err).Error("Progress finished")
|
||||
p.isStopped = true
|
||||
p.fatalError = err
|
||||
p.cleanUpdateCh()
|
||||
@ -97,21 +91,26 @@ func (p *Progress) fatal(err error) {
|
||||
|
||||
func (p *Progress) cleanUpdateCh() {
|
||||
if p.updateCh == nil {
|
||||
// If the progress was ended by fatal instead finish, we ignore error.
|
||||
if p.fatalError != nil {
|
||||
return
|
||||
}
|
||||
panic("update should not be called after finish was called")
|
||||
return
|
||||
}
|
||||
|
||||
close(p.updateCh)
|
||||
p.updateCh = nil
|
||||
}
|
||||
|
||||
func (p *Progress) countsFinal() {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
defer p.update()
|
||||
|
||||
log.Info("Estimating count finished")
|
||||
p.messageCounted = true
|
||||
}
|
||||
|
||||
func (p *Progress) updateCount(mailbox string, count uint) {
|
||||
p.lock.Lock()
|
||||
defer p.update()
|
||||
defer p.lock.Unlock()
|
||||
defer p.update()
|
||||
|
||||
log.WithField("mailbox", mailbox).WithField("count", count).Debug("Mailbox count updated")
|
||||
p.messageCounts[mailbox] = count
|
||||
@ -120,8 +119,8 @@ func (p *Progress) updateCount(mailbox string, count uint) {
|
||||
// addMessage should be called as soon as there is ID of the message.
|
||||
func (p *Progress) addMessage(messageID string, rule *Rule) {
|
||||
p.lock.Lock()
|
||||
defer p.update()
|
||||
defer p.lock.Unlock()
|
||||
defer p.update()
|
||||
|
||||
p.log.WithField("id", messageID).Trace("Message added")
|
||||
p.messageStatuses[messageID] = &MessageStatus{
|
||||
@ -134,10 +133,15 @@ func (p *Progress) addMessage(messageID string, rule *Rule) {
|
||||
// messageExported should be called right before message is exported.
|
||||
func (p *Progress) messageExported(messageID string, body []byte, err error) {
|
||||
p.lock.Lock()
|
||||
defer p.update()
|
||||
defer p.lock.Unlock()
|
||||
defer p.update()
|
||||
|
||||
log := p.log.WithField("id", messageID)
|
||||
if err != nil {
|
||||
log = log.WithError(err)
|
||||
}
|
||||
log.Debug("Message exported")
|
||||
|
||||
p.log.WithField("id", messageID).WithError(err).Debug("Message exported")
|
||||
status := p.messageStatuses[messageID]
|
||||
status.exportErr = err
|
||||
if err == nil {
|
||||
@ -148,7 +152,7 @@ func (p *Progress) messageExported(messageID string, body []byte, err error) {
|
||||
status.bodyHash = fmt.Sprintf("%x", sha256.Sum256(body))
|
||||
|
||||
if header, err := getMessageHeader(body); err != nil {
|
||||
p.log.WithField("id", messageID).WithError(err).Warning("Failed to parse headers for reporting")
|
||||
log.WithError(err).Warning("Failed to parse headers for reporting")
|
||||
} else {
|
||||
status.setDetailsFromHeader(header)
|
||||
}
|
||||
@ -163,10 +167,15 @@ func (p *Progress) messageExported(messageID string, body []byte, err error) {
|
||||
// messageImported should be called right after message is imported.
|
||||
func (p *Progress) messageImported(messageID, importID string, err error) {
|
||||
p.lock.Lock()
|
||||
defer p.update()
|
||||
defer p.lock.Unlock()
|
||||
defer p.update()
|
||||
|
||||
log := p.log.WithField("id", messageID)
|
||||
if err != nil {
|
||||
log = log.WithError(err)
|
||||
}
|
||||
log.Debug("Message imported")
|
||||
|
||||
p.log.WithField("id", messageID).WithError(err).Debug("Message imported")
|
||||
p.messageStatuses[messageID].targetID = importID
|
||||
p.messageStatuses[messageID].importErr = err
|
||||
if err == nil {
|
||||
@ -187,6 +196,8 @@ func (p *Progress) logMessage(messageID string) {
|
||||
|
||||
// callWrap calls the callback and in case of problem it pause the process.
|
||||
// Then it waits for user action to fix it and click on continue or abort.
|
||||
// Every function doing I/O should be wrapped by this function to provide
|
||||
// stopping and pausing functionality.
|
||||
func (p *Progress) callWrap(callback func() error) {
|
||||
for {
|
||||
if p.shouldStop() {
|
||||
@ -222,8 +233,8 @@ func (p *Progress) GetUpdateChannel() chan struct{} {
|
||||
// Pause pauses the progress.
|
||||
func (p *Progress) Pause(reason string) {
|
||||
p.lock.Lock()
|
||||
defer p.update()
|
||||
defer p.lock.Unlock()
|
||||
defer p.update()
|
||||
|
||||
p.log.Info("Progress paused")
|
||||
p.pauseReason = reason
|
||||
@ -232,8 +243,8 @@ func (p *Progress) Pause(reason string) {
|
||||
// Resume resumes the progress.
|
||||
func (p *Progress) Resume() {
|
||||
p.lock.Lock()
|
||||
defer p.update()
|
||||
defer p.lock.Unlock()
|
||||
defer p.update()
|
||||
|
||||
p.log.Info("Progress resumed")
|
||||
p.pauseReason = ""
|
||||
@ -258,8 +269,8 @@ func (p *Progress) PauseReason() string {
|
||||
// Stop stops the process.
|
||||
func (p *Progress) Stop() {
|
||||
p.lock.Lock()
|
||||
defer p.update()
|
||||
defer p.lock.Unlock()
|
||||
defer p.update()
|
||||
|
||||
p.log.Info("Progress stopped")
|
||||
p.isStopped = true
|
||||
@ -304,6 +315,12 @@ func (p *Progress) GetCounts() (failed, imported, exported, added, total uint) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
// Return counts only once total is estimated or the process already
|
||||
// ended (for a case when it ended quickly to report it correctly).
|
||||
if p.updateCh != nil && !p.messageCounted {
|
||||
return
|
||||
}
|
||||
|
||||
// Include lost messages in the process only when transfer is done.
|
||||
includeMissing := p.updateCh == nil
|
||||
|
||||
@ -334,10 +351,10 @@ func (p *Progress) GenerateBugReport() []byte {
|
||||
return bugReport.getData()
|
||||
}
|
||||
|
||||
func (p *Progress) FileReport() (path string) {
|
||||
if r := p.fileReport; r != nil {
|
||||
path = r.path
|
||||
// FileReport returns path to generated defailed file report.
|
||||
func (p *Progress) FileReport() string {
|
||||
if p.fileReport == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return
|
||||
return p.fileReport.path
|
||||
}
|
||||
|
||||
@ -29,8 +29,6 @@ func TestProgressUpdateCount(t *testing.T) {
|
||||
progress := newProgress(log, nil)
|
||||
drainProgressUpdateChannel(&progress)
|
||||
|
||||
progress.start()
|
||||
|
||||
progress.updateCount("inbox", 10)
|
||||
progress.updateCount("archive", 20)
|
||||
progress.updateCount("inbox", 12)
|
||||
@ -48,8 +46,6 @@ func TestProgressAddingMessages(t *testing.T) {
|
||||
progress := newProgress(log, nil)
|
||||
drainProgressUpdateChannel(&progress)
|
||||
|
||||
progress.start()
|
||||
|
||||
// msg1 has no problem.
|
||||
progress.addMessage("msg1", nil)
|
||||
progress.messageExported("msg1", []byte(""), nil)
|
||||
@ -92,18 +88,16 @@ func TestProgressFinish(t *testing.T) {
|
||||
progress := newProgress(log, nil)
|
||||
drainProgressUpdateChannel(&progress)
|
||||
|
||||
progress.start()
|
||||
progress.finish()
|
||||
r.Nil(t, progress.updateCh)
|
||||
|
||||
r.Panics(t, func() { progress.addMessage("msg", nil) })
|
||||
r.NotPanics(t, func() { progress.addMessage("msg", nil) })
|
||||
}
|
||||
|
||||
func TestProgressFatalError(t *testing.T) {
|
||||
progress := newProgress(log, nil)
|
||||
drainProgressUpdateChannel(&progress)
|
||||
|
||||
progress.start()
|
||||
progress.fatal(errors.New("fatal error"))
|
||||
r.Nil(t, progress.updateCh)
|
||||
|
||||
|
||||
@ -36,6 +36,10 @@ func (p *EMLProvider) TransferTo(rules transferRules, progress *Progress, ch cha
|
||||
return
|
||||
}
|
||||
|
||||
if len(filePathsPerFolder) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// This list is not filtered by time but instead going throgh each file
|
||||
// twice or keeping all in memory we will tell rough estimation which
|
||||
// will be updated during processing each file.
|
||||
@ -46,6 +50,7 @@ func (p *EMLProvider) TransferTo(rules transferRules, progress *Progress, ch cha
|
||||
|
||||
progress.updateCount(folderName, uint(len(filePaths)))
|
||||
}
|
||||
progress.countsFinal()
|
||||
|
||||
for folderName, filePaths := range filePathsPerFolder {
|
||||
// No error guaranteed by getFilePathsPerFolder.
|
||||
|
||||
@ -21,7 +21,6 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
)
|
||||
@ -73,7 +72,7 @@ func (p *EMLProvider) createFolders(rules transferRules) error {
|
||||
|
||||
func (p *EMLProvider) writeFile(msg Message) error {
|
||||
fileName := filepath.Base(msg.ID)
|
||||
if !strings.HasSuffix(fileName, ".eml") {
|
||||
if filepath.Ext(fileName) != ".eml" {
|
||||
fileName += ".eml"
|
||||
}
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ func (p *IMAPProvider) ID() string {
|
||||
// Mailboxes returns all available folder names from root of EML files.
|
||||
// In case the same folder name is used more than once (for example root/a/foo
|
||||
// and root/b/foo), it's treated as the same folder.
|
||||
func (p *IMAPProvider) Mailboxes(includEmpty, includeAllMail bool) ([]Mailbox, error) {
|
||||
func (p *IMAPProvider) Mailboxes(includeEmpty, includeAllMail bool) ([]Mailbox, error) {
|
||||
mailboxesInfo, err := p.list()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -73,11 +73,11 @@ func (p *IMAPProvider) Mailboxes(includEmpty, includeAllMail bool) ([]Mailbox, e
|
||||
break
|
||||
}
|
||||
}
|
||||
if hasNoSelect || mailbox.Name == "[Gmail]" {
|
||||
if hasNoSelect {
|
||||
continue
|
||||
}
|
||||
|
||||
if !includEmpty || true {
|
||||
if !includeEmpty || true {
|
||||
mailboxStatus, err := p.selectIn(mailbox.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -72,6 +72,7 @@ func (p *IMAPProvider) loadMessageInfoMap(rules transferRules, progress *Progres
|
||||
res[rule.SourceMailbox.Name] = messagesInfo
|
||||
progress.updateCount(rule.SourceMailbox.Name, uint(len(messagesInfo)))
|
||||
}
|
||||
progress.countsFinal()
|
||||
|
||||
return res
|
||||
}
|
||||
@ -109,7 +110,9 @@ func (p *IMAPProvider) loadMessagesInfo(rule *Rule, progress *Progress, uidValid
|
||||
return
|
||||
}
|
||||
}
|
||||
id := fmt.Sprintf("%s_%d:%d", rule.SourceMailbox.Name, uidValidity, imapMessage.Uid)
|
||||
id := getUniqueMessageID(rule.SourceMailbox.Name, uidValidity, imapMessage.Uid)
|
||||
// We use ID as key to ensure we have every unique message only once.
|
||||
// Some IMAP servers responded twice the same message...
|
||||
messagesInfo[id] = imapMessageInfo{
|
||||
id: id,
|
||||
uid: imapMessage.Uid,
|
||||
@ -173,6 +176,10 @@ func (p *IMAPProvider) exportMessages(rule *Rule, progress *Progress, ch chan<-
|
||||
items := []imap.FetchItem{imap.FetchUid, imap.FetchFlags, section.FetchItem()}
|
||||
|
||||
processMessageCallback := func(imapMessage *imap.Message) {
|
||||
if progress.shouldStop() {
|
||||
return
|
||||
}
|
||||
|
||||
id, ok := uidToID[imapMessage.Uid]
|
||||
|
||||
// Sometimes, server sends not requested messages.
|
||||
@ -217,3 +224,7 @@ func (p *IMAPProvider) exportMessage(rule *Rule, id string, imapMessage *imap.Me
|
||||
Targets: rule.TargetMailboxes,
|
||||
}
|
||||
}
|
||||
|
||||
func getUniqueMessageID(mailboxName string, uidValidity, uid uint32) string {
|
||||
return fmt.Sprintf("%s_%d:%d", mailboxName, uidValidity, uid)
|
||||
}
|
||||
|
||||
@ -40,6 +40,10 @@ func (p *MBOXProvider) TransferTo(rules transferRules, progress *Progress, ch ch
|
||||
return
|
||||
}
|
||||
|
||||
if len(filePathsPerFolder) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for folderName, filePaths := range filePathsPerFolder {
|
||||
// No error guaranteed by getFilePathsPerFolder.
|
||||
rule, _ := rules.getRuleBySourceMailboxName(folderName)
|
||||
@ -50,6 +54,7 @@ func (p *MBOXProvider) TransferTo(rules transferRules, progress *Progress, ch ch
|
||||
p.updateCount(rule, progress, filePath)
|
||||
}
|
||||
}
|
||||
progress.countsFinal()
|
||||
|
||||
for folderName, filePaths := range filePathsPerFolder {
|
||||
// No error guaranteed by getFilePathsPerFolder.
|
||||
|
||||
@ -24,6 +24,7 @@ import (
|
||||
pkgMessage "github.com/ProtonMail/proton-bridge/pkg/message"
|
||||
"github.com/ProtonMail/proton-bridge/pkg/pmapi"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const pmapiListPageSize = 150
|
||||
@ -59,10 +60,11 @@ func (p *PMAPIProvider) loadCounts(rules transferRules, progress *Progress) {
|
||||
rule := rule
|
||||
progress.callWrap(func() error {
|
||||
_, total, err := p.listMessages(&pmapi.MessagesFilter{
|
||||
LabelID: rule.SourceMailbox.ID,
|
||||
Begin: rule.FromTime,
|
||||
End: rule.ToTime,
|
||||
Limit: 0,
|
||||
AddressID: p.addressID,
|
||||
LabelID: rule.SourceMailbox.ID,
|
||||
Begin: rule.FromTime,
|
||||
End: rule.ToTime,
|
||||
Limit: 0,
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Warning("Problem to load counts")
|
||||
@ -72,10 +74,11 @@ func (p *PMAPIProvider) loadCounts(rules transferRules, progress *Progress) {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
progress.countsFinal()
|
||||
}
|
||||
|
||||
func (p *PMAPIProvider) transferTo(rule *Rule, progress *Progress, ch chan<- Message, skipEncryptedMessages bool) {
|
||||
nextID := ""
|
||||
page := 0
|
||||
for {
|
||||
if progress.shouldStop() {
|
||||
break
|
||||
@ -84,30 +87,33 @@ func (p *PMAPIProvider) transferTo(rule *Rule, progress *Progress, ch chan<- Mes
|
||||
isLastPage := true
|
||||
|
||||
progress.callWrap(func() error {
|
||||
// Would be better to filter by Begin and BeginID to be sure
|
||||
// in case user deletes messages during the process, no message
|
||||
// is skipped (paging is off then), but API does not support
|
||||
// filtering by both mentioned fields at the same time.
|
||||
desc := false
|
||||
pmapiMessages, count, err := p.listMessages(&pmapi.MessagesFilter{
|
||||
pmapiMessages, total, err := p.listMessages(&pmapi.MessagesFilter{
|
||||
AddressID: p.addressID,
|
||||
LabelID: rule.SourceMailbox.ID,
|
||||
Begin: rule.FromTime,
|
||||
End: rule.ToTime,
|
||||
BeginID: nextID,
|
||||
PageSize: pmapiListPageSize,
|
||||
Page: 0,
|
||||
Page: page,
|
||||
Sort: "ID",
|
||||
Desc: &desc,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.WithField("label", rule.SourceMailbox.ID).WithField("next", nextID).WithField("count", count).Debug("Listing messages")
|
||||
log.WithFields(logrus.Fields{
|
||||
"label": rule.SourceMailbox.ID,
|
||||
"page": page,
|
||||
"total": total,
|
||||
"count": len(pmapiMessages),
|
||||
}).Debug("Listing messages")
|
||||
|
||||
isLastPage = len(pmapiMessages) < pmapiListPageSize
|
||||
|
||||
// The first ID is the last one from the last page (= do not export twice the same one).
|
||||
if nextID != "" {
|
||||
pmapiMessages = pmapiMessages[1:]
|
||||
}
|
||||
|
||||
for _, pmapiMessage := range pmapiMessages {
|
||||
if progress.shouldStop() {
|
||||
break
|
||||
@ -122,9 +128,7 @@ func (p *PMAPIProvider) transferTo(rule *Rule, progress *Progress, ch chan<- Mes
|
||||
}
|
||||
}
|
||||
|
||||
if !isLastPage {
|
||||
nextID = pmapiMessages[len(pmapiMessages)-1].ID
|
||||
}
|
||||
page++
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -71,6 +71,11 @@ func (p *PMAPIProvider) TransferFrom(rules transferRules, progress *Progress, ch
|
||||
log.Info("Started transfer from channel to PMAPI")
|
||||
defer log.Info("Finished transfer from channel to PMAPI")
|
||||
|
||||
// Cache has to be cleared before each transfer to not contain
|
||||
// old stuff from previous cancelled run.
|
||||
p.importMsgReqMap = map[string]*pmapi.ImportMsgReq{}
|
||||
p.importMsgReqSize = 0
|
||||
|
||||
for msg := range ch {
|
||||
if progress.shouldStop() {
|
||||
break
|
||||
|
||||
@ -229,8 +229,8 @@ func (r *transferRules) getRule(sourceMailbox Mailbox) *Rule {
|
||||
return r.rules[h]
|
||||
}
|
||||
|
||||
// getRules returns all set rules.
|
||||
func (r *transferRules) getRules() []*Rule {
|
||||
// getSortedRules returns all set rules in order by `byRuleOrder`.
|
||||
func (r *transferRules) getSortedRules() []*Rule {
|
||||
rules := []*Rule{}
|
||||
for _, rule := range r.rules {
|
||||
rules = append(rules, rule)
|
||||
|
||||
@ -239,7 +239,7 @@ func TestOrderRules(t *testing.T) {
|
||||
}
|
||||
|
||||
gotMailboxNames := []string{}
|
||||
for _, rule := range transferRules.getRules() {
|
||||
for _, rule := range transferRules.getSortedRules() {
|
||||
gotMailboxNames = append(gotMailboxNames, rule.SourceMailbox.Name)
|
||||
}
|
||||
|
||||
|
||||
@ -34,10 +34,11 @@ type Transfer struct {
|
||||
panicHandler PanicHandler
|
||||
metrics MetricsManager
|
||||
id string
|
||||
dir string
|
||||
logDir string
|
||||
rules transferRules
|
||||
source SourceProvider
|
||||
target TargetProvider
|
||||
rulesCache []*Rule
|
||||
sourceMboxCache []Mailbox
|
||||
targetMboxCache []Mailbox
|
||||
}
|
||||
@ -47,14 +48,14 @@ type Transfer struct {
|
||||
// source := transfer.NewEMLProvider(...)
|
||||
// target := transfer.NewPMAPIProvider(...)
|
||||
// transfer.New(source, target, ...)
|
||||
func New(panicHandler PanicHandler, metrics MetricsManager, transferDir string, source SourceProvider, target TargetProvider) (*Transfer, error) {
|
||||
func New(panicHandler PanicHandler, metrics MetricsManager, logDir, rulesDir string, source SourceProvider, target TargetProvider) (*Transfer, error) {
|
||||
transferID := fmt.Sprintf("%x", sha256.Sum256([]byte(source.ID()+"-"+target.ID())))
|
||||
rules := loadRules(transferDir, transferID)
|
||||
rules := loadRules(rulesDir, transferID)
|
||||
transfer := &Transfer{
|
||||
panicHandler: panicHandler,
|
||||
metrics: metrics,
|
||||
id: transferID,
|
||||
dir: transferDir,
|
||||
logDir: logDir,
|
||||
rules: rules,
|
||||
source: source,
|
||||
target: target,
|
||||
@ -108,16 +109,19 @@ func (t *Transfer) SetGlobalTimeLimit(fromTime, toTime int64) {
|
||||
|
||||
// SetRule sets sourceMailbox for transfer.
|
||||
func (t *Transfer) SetRule(sourceMailbox Mailbox, targetMailboxes []Mailbox, fromTime, toTime int64) error {
|
||||
t.rulesCache = nil
|
||||
return t.rules.setRule(sourceMailbox, targetMailboxes, fromTime, toTime)
|
||||
}
|
||||
|
||||
// UnsetRule unsets sourceMailbox from transfer.
|
||||
func (t *Transfer) UnsetRule(sourceMailbox Mailbox) {
|
||||
t.rulesCache = nil
|
||||
t.rules.unsetRule(sourceMailbox)
|
||||
}
|
||||
|
||||
// ResetRules unsets all rules.
|
||||
func (t *Transfer) ResetRules() {
|
||||
t.rulesCache = nil
|
||||
t.rules.reset()
|
||||
}
|
||||
|
||||
@ -128,7 +132,10 @@ func (t *Transfer) GetRule(sourceMailbox Mailbox) *Rule {
|
||||
|
||||
// GetRules returns all set transfer rules.
|
||||
func (t *Transfer) GetRules() []*Rule {
|
||||
return t.rules.getRules()
|
||||
if t.rulesCache == nil {
|
||||
t.rulesCache = t.rules.getSortedRules()
|
||||
}
|
||||
return t.rulesCache
|
||||
}
|
||||
|
||||
// SourceMailboxes returns mailboxes available at source side.
|
||||
@ -171,7 +178,7 @@ func (t *Transfer) Start() *Progress {
|
||||
t.metrics.Start()
|
||||
|
||||
log := log.WithField("id", t.id)
|
||||
reportFile := newFileReport(t.dir, t.id)
|
||||
reportFile := newFileReport(t.logDir, t.id)
|
||||
progress := newProgress(log, reportFile)
|
||||
|
||||
ch := make(chan Message)
|
||||
@ -179,7 +186,6 @@ func (t *Transfer) Start() *Progress {
|
||||
go func() {
|
||||
defer t.panicHandler.HandlePanic()
|
||||
|
||||
progress.start()
|
||||
t.source.TransferTo(t.rules, &progress, ch)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
Reference in New Issue
Block a user