Files
proton-bridge/internal/store/sync.go
2021-04-30 05:41:39 +02:00

224 lines
6.0 KiB
Go

// Copyright (c) 2021 Proton Technologies AG
//
// This file is part of ProtonMail Bridge.
//
// ProtonMail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// ProtonMail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with ProtonMail Bridge. If not, see <https://www.gnu.org/licenses/>.
package store
import (
"context"
"math"
"sync"
"github.com/ProtonMail/proton-bridge/pkg/pmapi"
"github.com/pkg/errors"
)
const (
syncMinPagesPerWorker = 10
syncMessagesMaxWorkers = 5
maxFilterPageSize = 150
)
type storeSynchronizer interface {
getAllMessageIDs() ([]string, error)
createOrUpdateMessagesEvent([]*pmapi.Message) error
deleteMessagesEvent([]string) error
saveSyncState(finishTime int64, idRanges []*syncIDRange, idsToBeDeleted []string)
}
type messageLister interface {
ListMessages(context.Context, *pmapi.MessagesFilter) ([]*pmapi.Message, int, error)
}
func syncAllMail(panicHandler PanicHandler, store storeSynchronizer, api messageLister, syncState *syncState) error {
labelID := pmapi.AllMailLabel
// When the full sync starts (i.e. is not already in progress), we need to load
// - all message IDs in database, so we can see which messages we need to remove at the end of the sync
// - ID ranges which indicate how to split work into multiple workers
if !syncState.isIncomplete() {
if err := syncState.loadMessageIDsToBeDeleted(); err != nil {
return errors.Wrap(err, "failed to load message IDs")
}
if err := findIDRanges(labelID, api, syncState); err != nil {
return errors.Wrap(err, "failed to load IDs ranges")
}
syncState.save()
}
wg := &sync.WaitGroup{}
shouldStop := 0 // Using integer to have it atomic.
var resultError error
for _, idRange := range syncState.idRanges {
wg.Add(1)
idRange := idRange // Bind for goroutine.
go func() {
defer panicHandler.HandlePanic()
defer wg.Done()
err := syncBatch(labelID, store, api, syncState, idRange, &shouldStop)
if err != nil {
shouldStop = 1
resultError = errors.Wrap(err, "failed to sync group")
}
}()
}
wg.Wait()
if resultError == nil {
if err := syncState.deleteMessagesToBeDeleted(); err != nil {
return errors.Wrap(err, "failed to delete messages")
}
}
return resultError
}
func findIDRanges(labelID string, api messageLister, syncState *syncState) error {
_, count, err := getSplitIDAndCount(labelID, api, 0)
if err != nil {
return errors.Wrap(err, "failed to get first ID and count")
}
log.WithField("total", count).Debug("Finding ID ranges")
if count == 0 {
return nil
}
syncState.initIDRanges()
pages := int(math.Ceil(float64(count) / float64(maxFilterPageSize)))
workers := (pages / syncMinPagesPerWorker) + 1
if workers > syncMessagesMaxWorkers {
workers = syncMessagesMaxWorkers
}
if workers == 1 {
return nil
}
step := int(math.Round(float64(pages) / float64(workers)))
// Increment steps in case there are more steps than max # of workers (due to rounding).
if (step*syncMessagesMaxWorkers)+1 < pages {
step++
}
for page := step; page < pages; page += step {
splitID, _, err := getSplitIDAndCount(labelID, api, page)
if err != nil {
return errors.Wrap(err, "failed to get IDs range")
}
// Some messages were probably deleted and so the page does not exist anymore.
// Would be good to start this function again, but let's rather start the sync instead of
// wasting time of many calls to API to find where to split workers.
if splitID == "" {
break
}
syncState.addIDRange(splitID)
}
return nil
}
func getSplitIDAndCount(labelID string, api messageLister, page int) (string, int, error) {
sort := "ID"
desc := false
filter := &pmapi.MessagesFilter{
LabelID: labelID,
Sort: sort,
Desc: &desc,
PageSize: maxFilterPageSize,
Page: page,
Limit: 1,
}
// If the page does not exist, an empty page instead of an error is returned.
messages, total, err := api.ListMessages(context.Background(), filter)
if err != nil {
return "", 0, errors.Wrap(err, "failed to list messages")
}
if len(messages) == 0 {
return "", 0, nil
}
return messages[0].ID, total, nil
}
func syncBatch( //nolint[funlen]
labelID string,
store storeSynchronizer,
api messageLister,
syncState *syncState,
idRange *syncIDRange,
shouldStop *int,
) error {
log.WithField("start", idRange.StartID).WithField("stop", idRange.StopID).Info("Starting sync batch")
for {
if *shouldStop == 1 || idRange.isFinished() {
break
}
sort := "ID"
desc := true
filter := &pmapi.MessagesFilter{
LabelID: labelID,
Sort: sort,
Desc: &desc,
PageSize: maxFilterPageSize,
Page: 0,
// Messages with BeginID and EndID are included. We will process
// those messages twice, but that's OK.
// When message is completely removed, it still works as expected.
BeginID: idRange.StartID,
EndID: idRange.StopID,
}
log.WithField("begin", filter.BeginID).WithField("end", filter.EndID).Debug("Fetching page")
messages, _, err := api.ListMessages(context.Background(), filter)
if err != nil {
return errors.Wrap(err, "failed to list messages")
}
if len(messages) == 0 {
break
}
for _, m := range messages {
syncState.doNotDeleteMessageID(m.ID)
}
syncState.save()
if err := store.createOrUpdateMessagesEvent(messages); err != nil {
return errors.Wrap(err, "failed to create or update messages")
}
pageLastMessageID := messages[len(messages)-1].ID
if !desc {
idRange.setStartID(pageLastMessageID)
} else {
idRange.setStopID(pageLastMessageID)
}
if len(messages) < maxFilterPageSize {
break
}
}
return nil
}