GODT-1815: Combined/Split mode

This commit is contained in:
James Houlahan
2022-09-28 11:29:33 +02:00
parent 9670e29d9f
commit e9672e6bba
55 changed files with 1909 additions and 705 deletions

View File

@ -4,57 +4,34 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/proton-bridge/v2/internal/events"
"github.com/ProtonMail/proton-bridge/v2/internal/vault"
"github.com/bradenaw/juniper/xslices"
"github.com/google/uuid"
"gitlab.protontech.ch/go/liteapi"
"golang.org/x/exp/slices"
)
const chunkSize = 1 << 20
func (user *User) sync(ctx context.Context) error {
user.notifyCh <- events.SyncStarted{
UserID: user.ID(),
}
if err := user.syncLabels(ctx); err != nil {
return fmt.Errorf("failed to sync labels: %w", err)
}
if err := user.syncMessages(ctx); err != nil {
return fmt.Errorf("failed to sync messages: %w", err)
}
user.notifyCh <- events.SyncFinished{
UserID: user.ID(),
}
if err := user.vault.SetSync(true); err != nil {
return fmt.Errorf("failed to update sync status: %w", err)
}
return nil
}
func (user *User) syncLabels(ctx context.Context) error {
func (user *User) syncLabels(ctx context.Context, addrIDs ...string) error {
// Sync the system folders.
system, err := user.client.GetLabels(ctx, liteapi.LabelTypeSystem)
if err != nil {
return err
}
for _, label := range system {
user.updateCh <- newSystemMailboxCreatedUpdate(imap.LabelID(label.ID), label.Name)
for _, label := range xslices.Filter(system, func(label liteapi.Label) bool { return wantLabelID(label.ID) }) {
for _, addrID := range addrIDs {
user.updateCh[addrID].Enqueue(newSystemMailboxCreatedUpdate(imap.LabelID(label.ID), label.Name))
}
}
// Create Folders/Labels mailboxes with a random ID and with the \Noselect attribute.
for _, prefix := range []string{folderPrefix, labelPrefix} {
user.updateCh <- newPlaceHolderMailboxCreatedUpdate(prefix)
for _, addrID := range addrIDs {
user.updateCh[addrID].Enqueue(newPlaceHolderMailboxCreatedUpdate(prefix))
}
}
// Sync the API folders.
@ -64,7 +41,9 @@ func (user *User) syncLabels(ctx context.Context) error {
}
for _, folder := range folders {
user.updateCh <- newMailboxCreatedUpdate(imap.LabelID(folder.ID), []string{folderPrefix, folder.Path})
for _, addrID := range addrIDs {
user.updateCh[addrID].Enqueue(newMailboxCreatedUpdate(imap.LabelID(folder.ID), []string{folderPrefix, folder.Path}))
}
}
// Sync the API labels.
@ -74,7 +53,9 @@ func (user *User) syncLabels(ctx context.Context) error {
}
for _, label := range labels {
user.updateCh <- newMailboxCreatedUpdate(imap.LabelID(label.ID), []string{labelPrefix, label.Path})
for _, addrID := range addrIDs {
user.updateCh[addrID].Enqueue(newMailboxCreatedUpdate(imap.LabelID(label.ID), []string{labelPrefix, label.Path}))
}
}
return nil
@ -84,27 +65,53 @@ func (user *User) syncMessages(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Determine which messages to sync.
// TODO: This needs to be done better using the new API route to retrieve just the message IDs.
metadata, err := user.client.GetAllMessageMetadata(ctx)
if err != nil {
return err
}
// If in split mode, we need to send each message to a different IMAP connector.
isSplitMode := user.vault.AddressMode() == vault.SplitMode
// Collect the build requests -- we need:
// - the message ID to build,
// - the keyring to decrypt the message,
// - and the address to send the message to (for split mode).
requests := xslices.Map(metadata, func(metadata liteapi.MessageMetadata) request {
var addressID string
if isSplitMode {
addressID = metadata.AddressID
} else {
addressID = user.apiAddrs.primary()
}
return request{
messageID: metadata.ID,
addressID: addressID,
addrKR: user.addrKRs[metadata.AddressID],
}
})
flusher := newFlusher(user.ID(), user.updateCh, user.notifyCh, len(metadata), chunkSize)
defer flusher.flush()
// Create the flushers, one per update channel.
flushers := make(map[string]*flusher)
for addrID, updateCh := range user.updateCh {
flusher := newFlusher(user.ID(), updateCh, user.eventCh, len(requests), chunkSize)
defer flusher.flush()
flushers[addrID] = flusher
}
// Build the messages and send them to the correct flusher.
if err := user.builder.Process(ctx, requests, func(req request, res *imap.MessageCreated, err error) error {
if err != nil {
return fmt.Errorf("failed to build message %s: %w", req.messageID, err)
}
flusher.push(res)
flushers[req.addressID].push(res)
return nil
}); err != nil {
@ -114,95 +121,15 @@ func (user *User) syncMessages(ctx context.Context) error {
return nil
}
type flusher struct {
userID string
func (user *User) syncWait() {
for _, updateCh := range user.updateCh {
waiter := imap.NewNoop()
defer waiter.Wait()
updates []*imap.MessageCreated
updateCh chan<- imap.Update
notifyCh chan<- events.Event
maxChunkSize int
curChunkSize int
count int
total int
start time.Time
pushLock sync.Mutex
}
func newFlusher(userID string, updateCh chan<- imap.Update, notifyCh chan<- events.Event, total, maxChunkSize int) *flusher {
return &flusher{
userID: userID,
updateCh: updateCh,
notifyCh: notifyCh,
maxChunkSize: maxChunkSize,
total: total,
start: time.Now(),
updateCh.Enqueue(waiter)
}
}
func (f *flusher) push(update *imap.MessageCreated) {
f.pushLock.Lock()
defer f.pushLock.Unlock()
f.updates = append(f.updates, update)
if f.curChunkSize += len(update.Literal); f.curChunkSize >= f.maxChunkSize {
f.flush()
}
}
func (f *flusher) flush() {
if len(f.updates) == 0 {
return
}
f.count += len(f.updates)
f.updateCh <- imap.NewMessagesCreated(f.updates...)
f.notifyCh <- newSyncProgress(f.userID, f.count, f.total, f.start)
f.updates = nil
f.curChunkSize = 0
}
func newSyncProgress(userID string, count, total int, start time.Time) events.SyncProgress {
return events.SyncProgress{
UserID: userID,
Progress: float64(count) / float64(total),
Elapsed: time.Since(start),
Remaining: time.Since(start) * time.Duration(total-count) / time.Duration(count),
}
}
func getMessageCreatedUpdate(message liteapi.Message, literal []byte) (*imap.MessageCreated, error) {
parsedMessage, err := imap.NewParsedMessage(literal)
if err != nil {
return nil, err
}
flags := imap.NewFlagSet()
if !message.Unread {
flags = flags.Add(imap.FlagSeen)
}
if slices.Contains(message.LabelIDs, liteapi.StarredLabel) {
flags = flags.Add(imap.FlagFlagged)
}
imapMessage := imap.Message{
ID: imap.MessageID(message.ID),
Flags: flags,
Date: time.Unix(message.Time, 0),
}
return &imap.MessageCreated{
Message: imapMessage,
Literal: literal,
LabelIDs: imapLabelIDs(filterLabelIDs(message.LabelIDs)),
ParsedMessage: parsedMessage,
}, nil
}
func newSystemMailboxCreatedUpdate(labelID imap.LabelID, labelName string) *imap.MailboxCreated {
if strings.EqualFold(labelName, imap.Inbox) {
labelName = imap.Inbox
@ -237,18 +164,12 @@ func newMailboxCreatedUpdate(labelID imap.LabelID, labelName []string) *imap.Mai
})
}
func filterLabelIDs(labelIDs []string) []string {
var filteredLabelIDs []string
func wantLabelID(labelID string) bool {
switch labelID {
case liteapi.AllDraftsLabel, liteapi.AllSentLabel, liteapi.OutboxLabel:
return false
for _, labelID := range labelIDs {
switch labelID {
case liteapi.AllDraftsLabel, liteapi.AllSentLabel, liteapi.OutboxLabel:
// ... skip ...
default:
filteredLabelIDs = append(filteredLabelIDs, labelID)
}
default:
return true
}
return filteredLabelIDs
}