diff --git a/internal/bridge/events.go b/internal/bridge/events.go
new file mode 100644
index 00000000..e74ce269
--- /dev/null
+++ b/internal/bridge/events.go
@@ -0,0 +1,35 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package bridge
+
+import (
+ "github.com/ProtonMail/gluon/watcher"
+ "github.com/ProtonMail/proton-bridge/v3/internal/events"
+)
+
+type bridgeEventSubscription struct {
+ b *Bridge
+}
+
+func (b bridgeEventSubscription) Add(ofType ...events.Event) *watcher.Watcher[events.Event] {
+ return b.b.addWatcher(ofType...)
+}
+
+func (b bridgeEventSubscription) Remove(watcher *watcher.Watcher[events.Event]) {
+ b.b.remWatcher(watcher)
+}
diff --git a/internal/services/imapservice/api_client.go b/internal/services/imapservice/api_client.go
new file mode 100644
index 00000000..c3b3dbc8
--- /dev/null
+++ b/internal/services/imapservice/api_client.go
@@ -0,0 +1,50 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "io"
+
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/gopenpgp/v2/crypto"
+ "github.com/bradenaw/juniper/stream"
+)
+
+type APIClient interface {
+ CreateLabel(ctx context.Context, req proton.CreateLabelReq) (proton.Label, error)
+ GetLabel(ctx context.Context, labelID string, labelTypes ...proton.LabelType) (proton.Label, error)
+ UpdateLabel(ctx context.Context, labelID string, req proton.UpdateLabelReq) (proton.Label, error)
+ DeleteLabel(ctx context.Context, labelID string) error
+ LabelMessages(ctx context.Context, messageIDs []string, labelID string) error
+ UnlabelMessages(ctx context.Context, messageIDs []string, labelID string) error
+ GetLabels(ctx context.Context, labelTypes ...proton.LabelType) ([]proton.Label, error)
+
+ GetMessage(ctx context.Context, messageID string) (proton.Message, error)
+ GetMessageMetadataPage(ctx context.Context, page, pageSize int, filter proton.MessageFilter) ([]proton.MessageMetadata, error)
+ GetMessageIDs(ctx context.Context, afterID string) ([]string, error)
+ CreateDraft(ctx context.Context, addrKR *crypto.KeyRing, req proton.CreateDraftReq) (proton.Message, error)
+ UploadAttachment(ctx context.Context, addrKR *crypto.KeyRing, req proton.CreateAttachmentReq) (proton.Attachment, error)
+ ImportMessages(ctx context.Context, addrKR *crypto.KeyRing, workers, buffer int, req ...proton.ImportReq) (stream.Stream[proton.ImportRes], error)
+ GetFullMessage(ctx context.Context, messageID string, scheduler proton.Scheduler, storageProvider proton.AttachmentAllocator) (proton.FullMessage, error)
+ GetAttachmentInto(ctx context.Context, attachmentID string, reader io.ReaderFrom) error
+ GetAttachment(ctx context.Context, attachmentID string) ([]byte, error)
+ DeleteMessage(ctx context.Context, messageIDs ...string) error
+ MarkMessagesRead(ctx context.Context, messageIDs ...string) error
+ MarkMessagesUnread(ctx context.Context, messageIDs ...string) error
+}
diff --git a/internal/services/imapservice/connector.go b/internal/services/imapservice/connector.go
new file mode 100644
index 00000000..2eb102fb
--- /dev/null
+++ b/internal/services/imapservice/connector.go
@@ -0,0 +1,674 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "net/mail"
+ "sync/atomic"
+ "time"
+
+ "github.com/ProtonMail/gluon/async"
+ "github.com/ProtonMail/gluon/connector"
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/gluon/rfc822"
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/gopenpgp/v2/crypto"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder"
+ "github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
+ "github.com/ProtonMail/proton-bridge/v3/pkg/message"
+ "github.com/ProtonMail/proton-bridge/v3/pkg/message/parser"
+ "github.com/bradenaw/juniper/stream"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/exp/slices"
+)
+
+// Connector contains all IMAP state required to satisfy sync and or imap queries.
+type Connector struct {
+ addrID string
+ showAllMail uint32
+
+ flags imap.FlagSet
+ permFlags imap.FlagSet
+ attrs imap.FlagSet
+
+ identityState sharedIdentity
+ client APIClient
+ telemetry Telemetry
+ panicHandler async.PanicHandler
+ sendRecorder *sendrecorder.SendRecorder
+
+ addressMode usertypes.AddressMode
+ labels sharedLabels
+ updateCh *async.QueuedChannel[imap.Update]
+ log *logrus.Entry
+}
+
+func NewConnector(
+ addrID string,
+ apiClient APIClient,
+ labels sharedLabels,
+ identityState sharedIdentity,
+ addressMode usertypes.AddressMode,
+ sendRecorder *sendrecorder.SendRecorder,
+ panicHandler async.PanicHandler,
+ telemetry Telemetry,
+ showAllMail bool,
+) *Connector {
+ userID := identityState.UserID()
+
+ return &Connector{
+ identityState: identityState,
+ addrID: addrID,
+ showAllMail: b32(showAllMail),
+ flags: defaultFlags,
+ permFlags: defaultPermanentFlags,
+ attrs: defaultAttributes,
+
+ client: apiClient,
+ telemetry: telemetry,
+ panicHandler: panicHandler,
+ sendRecorder: sendRecorder,
+
+ updateCh: async.NewQueuedChannel[imap.Update](
+ 0,
+ 0,
+ panicHandler,
+ fmt.Sprintf("connector-update-%v-%v", userID, addrID),
+ ),
+ labels: labels,
+ addressMode: addressMode,
+ log: logrus.WithFields(logrus.Fields{
+ "gluon-connector": addressMode,
+ "addr-id": addrID,
+ "user-id": userID,
+ }),
+ }
+}
+
+func (s *Connector) StateClose() {
+ s.log.Debug("Closing state")
+ s.updateCh.CloseAndDiscardQueued()
+}
+
+func (s *Connector) Authorize(ctx context.Context, username string, password []byte) bool {
+ addrID, err := s.identityState.CheckAuth(username, password, s.telemetry)
+ if err != nil {
+ return false
+ }
+
+ if s.addressMode == usertypes.AddressModeSplit && addrID != s.addrID {
+ return false
+ }
+
+ s.telemetry.SendConfigStatusSuccess(ctx)
+
+ return true
+}
+
+func (s *Connector) CreateMailbox(ctx context.Context, name []string) (imap.Mailbox, error) {
+ if len(name) < 2 {
+ return imap.Mailbox{}, fmt.Errorf("invalid mailbox name %q: %w", name, connector.ErrOperationNotAllowed)
+ }
+
+ switch name[0] {
+ case folderPrefix:
+ return s.createFolder(ctx, name[1:])
+
+ case labelPrefix:
+ return s.createLabel(ctx, name[1:])
+
+ default:
+ return imap.Mailbox{}, fmt.Errorf("invalid mailbox name %q: %w", name, connector.ErrOperationNotAllowed)
+ }
+}
+
+func (s *Connector) GetMessageLiteral(ctx context.Context, id imap.MessageID) ([]byte, error) {
+ msg, err := s.client.GetFullMessage(ctx, string(id), usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator())
+ if err != nil {
+ return nil, err
+ }
+
+ var literal []byte
+ err = s.identityState.WithAddrKR(msg.AddressID, func(_, addrKR *crypto.KeyRing) error {
+ l, buildErr := message.BuildRFC822(addrKR, msg.Message, msg.AttData, defaultMessageJobOpts())
+ if buildErr != nil {
+ return buildErr
+ }
+
+ literal = l
+
+ return nil
+ })
+
+ return literal, err
+}
+
+func (s *Connector) GetMailboxVisibility(_ context.Context, mboxID imap.MailboxID) imap.MailboxVisibility {
+ switch mboxID {
+ case proton.AllMailLabel:
+ if atomic.LoadUint32(&s.showAllMail) != 0 {
+ return imap.Visible
+ }
+ return imap.Hidden
+
+ case proton.AllScheduledLabel:
+ return imap.HiddenIfEmpty
+ default:
+ return imap.Visible
+ }
+}
+
+func (s *Connector) UpdateMailboxName(ctx context.Context, mboxID imap.MailboxID, name []string) error {
+ if len(name) < 2 {
+ return fmt.Errorf("invalid mailbox name %q: %w", name, connector.ErrOperationNotAllowed)
+ }
+
+ switch name[0] {
+ case folderPrefix:
+ return s.updateFolder(ctx, mboxID, name[1:])
+
+ case labelPrefix:
+ return s.updateLabel(ctx, mboxID, name[1:])
+
+ default:
+ return fmt.Errorf("invalid mailbox name %q: %w", name, connector.ErrOperationNotAllowed)
+ }
+}
+
+func (s *Connector) DeleteMailbox(ctx context.Context, mboxID imap.MailboxID) error {
+ if err := s.client.DeleteLabel(ctx, string(mboxID)); err != nil {
+ return err
+ }
+
+ wLabels := s.labels.Write()
+ defer wLabels.Close()
+
+ wLabels.Delete(string(mboxID))
+
+ return nil
+}
+
+func (s *Connector) CreateMessage(ctx context.Context, mailboxID imap.MailboxID, literal []byte, flags imap.FlagSet, _ time.Time) (imap.Message, []byte, error) {
+ if mailboxID == proton.AllMailLabel {
+ return imap.Message{}, nil, connector.ErrOperationNotAllowed
+ }
+
+ toList, err := getLiteralToList(literal)
+ if err != nil {
+ return imap.Message{}, nil, fmt.Errorf("failed to retrieve addresses from literal:%w", err)
+ }
+
+ // Compute the hash of the message (to match it against SMTP messages).
+ hash, err := sendrecorder.GetMessageHash(literal)
+ if err != nil {
+ return imap.Message{}, nil, err
+ }
+
+ // Check if we already tried to send this message recently.
+ if messageID, ok, err := s.sendRecorder.HasEntryWait(ctx, hash, time.Now().Add(90*time.Second), toList); err != nil {
+ return imap.Message{}, nil, fmt.Errorf("failed to check send hash: %w", err)
+ } else if ok {
+ s.log.WithField("messageID", messageID).Warn("Message already sent")
+
+ // Query the server-side message.
+ full, err := s.client.GetFullMessage(ctx, messageID, usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator())
+ if err != nil {
+ return imap.Message{}, nil, fmt.Errorf("failed to fetch message: %w", err)
+ }
+
+ // Build the message as it is on the server.
+ if err := s.identityState.WithAddrKR(full.AddressID, func(_, addrKR *crypto.KeyRing) error {
+ var err error
+
+ if literal, err = message.BuildRFC822(addrKR, full.Message, full.AttData, defaultMessageJobOpts()); err != nil {
+ return err
+ }
+
+ return nil
+ }); err != nil {
+ return imap.Message{}, nil, fmt.Errorf("failed to build message: %w", err)
+ }
+
+ return toIMAPMessage(full.MessageMetadata), literal, nil
+ }
+
+ wantLabelIDs := []string{string(mailboxID)}
+
+ if flags.Contains(imap.FlagFlagged) {
+ wantLabelIDs = append(wantLabelIDs, proton.StarredLabel)
+ }
+
+ var wantFlags proton.MessageFlag
+
+ unread := !flags.Contains(imap.FlagSeen)
+
+ if mailboxID != proton.DraftsLabel {
+ header, err := rfc822.Parse(literal).ParseHeader()
+ if err != nil {
+ return imap.Message{}, nil, err
+ }
+
+ switch {
+ case mailboxID == proton.InboxLabel:
+ wantFlags = wantFlags.Add(proton.MessageFlagReceived)
+
+ case mailboxID == proton.SentLabel:
+ wantFlags = wantFlags.Add(proton.MessageFlagSent)
+
+ case header.Has("Received"):
+ wantFlags = wantFlags.Add(proton.MessageFlagReceived)
+
+ default:
+ wantFlags = wantFlags.Add(proton.MessageFlagSent)
+ }
+ } else {
+ unread = false
+ }
+
+ if flags.Contains(imap.FlagAnswered) {
+ wantFlags = wantFlags.Add(proton.MessageFlagReplied)
+ }
+
+ msg, literal, err := s.importMessage(ctx, literal, wantLabelIDs, wantFlags, unread)
+ if err != nil {
+ if errors.Is(err, proton.ErrImportSizeExceeded) {
+ // Remap error so that Gluon does not put this message in the recovery mailbox.
+ err = fmt.Errorf("%v: %w", err, connector.ErrMessageSizeExceedsLimits)
+ }
+
+ if apiErr := new(proton.APIError); errors.As(err, &apiErr) {
+ s.log.WithError(apiErr).WithField("Details", apiErr.DetailsToString()).Error("Failed to import message")
+ } else {
+ s.log.WithError(err).Error("Failed to import message")
+ }
+ }
+
+ return msg, literal, err
+}
+
+func (s *Connector) AddMessagesToMailbox(ctx context.Context, messageIDs []imap.MessageID, mboxID imap.MailboxID) error {
+ if isAllMailOrScheduled(mboxID) {
+ return connector.ErrOperationNotAllowed
+ }
+
+ return s.client.LabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), string(mboxID))
+}
+
+func (s *Connector) RemoveMessagesFromMailbox(ctx context.Context, messageIDs []imap.MessageID, mboxID imap.MailboxID) error {
+ if isAllMailOrScheduled(mboxID) {
+ return connector.ErrOperationNotAllowed
+ }
+
+ msgIDs := usertypes.MapTo[imap.MessageID, string](messageIDs)
+ if err := s.client.UnlabelMessages(ctx, msgIDs, string(mboxID)); err != nil {
+ return err
+ }
+
+ if mboxID == proton.TrashLabel || mboxID == proton.DraftsLabel {
+ if err := s.client.DeleteMessage(ctx, msgIDs...); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (s *Connector) MoveMessages(ctx context.Context, messageIDs []imap.MessageID, mboxFromID, mboxToID imap.MailboxID) (bool, error) {
+ if (mboxFromID == proton.InboxLabel && mboxToID == proton.SentLabel) ||
+ (mboxFromID == proton.SentLabel && mboxToID == proton.InboxLabel) ||
+ isAllMailOrScheduled(mboxFromID) ||
+ isAllMailOrScheduled(mboxToID) {
+ return false, connector.ErrOperationNotAllowed
+ }
+
+ shouldExpungeOldLocation := func() bool {
+ rdLabels := s.labels.Read()
+ defer rdLabels.Close()
+
+ var result bool
+
+ if v, ok := rdLabels.GetLabel(string(mboxFromID)); ok && v.Type == proton.LabelTypeLabel {
+ result = true
+ }
+
+ if v, ok := rdLabels.GetLabel(string(mboxToID)); ok && (v.Type == proton.LabelTypeFolder || v.Type == proton.LabelTypeSystem) {
+ result = true
+ }
+
+ return result
+ }()
+
+ if err := s.client.LabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), string(mboxToID)); err != nil {
+ return false, fmt.Errorf("labeling messages: %w", err)
+ }
+
+ if shouldExpungeOldLocation {
+ if err := s.client.UnlabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), string(mboxFromID)); err != nil {
+ return false, fmt.Errorf("unlabeling messages: %w", err)
+ }
+ }
+
+ return shouldExpungeOldLocation, nil
+}
+
+func (s *Connector) MarkMessagesSeen(ctx context.Context, messageIDs []imap.MessageID, seen bool) error {
+ if seen {
+ return s.client.MarkMessagesRead(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs)...)
+ }
+
+ return s.client.MarkMessagesUnread(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs)...)
+}
+
+func (s *Connector) MarkMessagesFlagged(ctx context.Context, messageIDs []imap.MessageID, flagged bool) error {
+ if flagged {
+ return s.client.LabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), proton.StarredLabel)
+ }
+
+ return s.client.UnlabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), proton.StarredLabel)
+}
+
+func (s *Connector) GetUpdates() <-chan imap.Update {
+ return s.updateCh.GetChannel()
+}
+
+func (s *Connector) Close(_ context.Context) error {
+ // Nothing to do
+ return nil
+}
+
+func (s *Connector) ShowAllMail(v bool) {
+ atomic.StoreUint32(&s.showAllMail, b32(v))
+}
+
+var (
+ defaultFlags = imap.NewFlagSet(imap.FlagSeen, imap.FlagFlagged, imap.FlagDeleted) // nolint:gochecknoglobals
+ defaultPermanentFlags = imap.NewFlagSet(imap.FlagSeen, imap.FlagFlagged, imap.FlagDeleted) // nolint:gochecknoglobals
+ defaultAttributes = imap.NewFlagSet() // nolint:gochecknoglobals
+)
+
+const (
+ folderPrefix = "Folders"
+ labelPrefix = "Labels"
+)
+
+// b32 returns a uint32 0 or 1 representing b.
+func b32(b bool) uint32 {
+ if b {
+ return 1
+ }
+
+ return 0
+}
+
+func (s *Connector) createLabel(ctx context.Context, name []string) (imap.Mailbox, error) {
+ if len(name) != 1 {
+ return imap.Mailbox{}, fmt.Errorf("a label cannot have children: %w", connector.ErrOperationNotAllowed)
+ }
+
+ label, err := s.client.CreateLabel(ctx, proton.CreateLabelReq{
+ Name: name[0],
+ Color: "#f66",
+ Type: proton.LabelTypeLabel,
+ })
+ if err != nil {
+ return imap.Mailbox{}, err
+ }
+
+ wLabels := s.labels.Write()
+ defer wLabels.Close()
+
+ wLabels.SetLabel(label.ID, label)
+
+ return toIMAPMailbox(label, s.flags, s.permFlags, s.attrs), nil
+}
+
+func (s *Connector) createFolder(ctx context.Context, name []string) (imap.Mailbox, error) {
+ var parentID string
+
+ wLabels := s.labels.Write()
+ defer wLabels.Close()
+
+ if len(name) > 1 {
+ for _, label := range wLabels.GetLabels() {
+ if !slices.Equal(label.Path, name[:len(name)-1]) {
+ continue
+ }
+
+ parentID = label.ID
+
+ break
+ }
+
+ if parentID == "" {
+ return imap.Mailbox{}, fmt.Errorf("parent folder %q does not exist: %w", name[:len(name)-1], connector.ErrOperationNotAllowed)
+ }
+ }
+
+ label, err := s.client.CreateLabel(ctx, proton.CreateLabelReq{
+ Name: name[len(name)-1],
+ Color: "#f66",
+ Type: proton.LabelTypeFolder,
+ ParentID: parentID,
+ })
+ if err != nil {
+ return imap.Mailbox{}, err
+ }
+
+ // Add label to list so subsequent sub folder create requests work correct.
+ wLabels.SetLabel(label.ID, label)
+
+ return toIMAPMailbox(label, s.flags, s.permFlags, s.attrs), nil
+}
+
+func (s *Connector) updateLabel(ctx context.Context, labelID imap.MailboxID, name []string) error {
+ if len(name) != 1 {
+ return fmt.Errorf("a label cannot have children: %w", connector.ErrOperationNotAllowed)
+ }
+
+ label, err := s.client.GetLabel(ctx, string(labelID), proton.LabelTypeLabel)
+ if err != nil {
+ return err
+ }
+
+ update, err := s.client.UpdateLabel(ctx, label.ID, proton.UpdateLabelReq{
+ Name: name[0],
+ Color: label.Color,
+ })
+ if err != nil {
+ return err
+ }
+
+ wLabels := s.labels.Write()
+ defer wLabels.Close()
+
+ wLabels.SetLabel(label.ID, update)
+
+ return nil
+}
+
+func (s *Connector) updateFolder(ctx context.Context, labelID imap.MailboxID, name []string) error {
+ var parentID string
+
+ wLabels := s.labels.Write()
+ defer wLabels.Close()
+
+ if len(name) > 1 {
+ for _, label := range wLabels.GetLabels() {
+ if !slices.Equal(label.Path, name[:len(name)-1]) {
+ continue
+ }
+
+ parentID = label.ID
+
+ break
+ }
+
+ if parentID == "" {
+ return fmt.Errorf("parent folder %q does not exist: %w", name[:len(name)-1], connector.ErrOperationNotAllowed)
+ }
+ }
+
+ label, err := s.client.GetLabel(ctx, string(labelID), proton.LabelTypeFolder)
+ if err != nil {
+ return err
+ }
+
+ update, err := s.client.UpdateLabel(ctx, string(labelID), proton.UpdateLabelReq{
+ Name: name[len(name)-1],
+ Color: label.Color,
+ ParentID: parentID,
+ })
+ if err != nil {
+ return err
+ }
+
+ wLabels.SetLabel(label.ID, update)
+
+ return nil
+}
+
+func (s *Connector) importMessage(
+ ctx context.Context,
+ literal []byte,
+ labelIDs []string,
+ flags proton.MessageFlag,
+ unread bool,
+) (imap.Message, []byte, error) {
+ var full proton.FullMessage
+
+ addr, ok := s.identityState.GetAddress(s.addrID)
+ if !ok {
+ return imap.Message{}, nil, fmt.Errorf("could not find address")
+ }
+
+ if err := s.identityState.WithAddrKR(s.addrID, func(_, addrKR *crypto.KeyRing) error {
+ var messageID string
+
+ if slices.Contains(labelIDs, proton.DraftsLabel) {
+ msg, err := s.createDraft(ctx, literal, addrKR, addr)
+ if err != nil {
+ return fmt.Errorf("failed to create draft: %w", err)
+ }
+
+ // apply labels
+
+ messageID = msg.ID
+ } else {
+ str, err := s.client.ImportMessages(ctx, addrKR, 1, 1, []proton.ImportReq{{
+ Metadata: proton.ImportMetadata{
+ AddressID: s.addrID,
+ LabelIDs: labelIDs,
+ Unread: proton.Bool(unread),
+ Flags: flags,
+ },
+ Message: literal,
+ }}...)
+ if err != nil {
+ return fmt.Errorf("failed to prepare message for import: %w", err)
+ }
+
+ res, err := stream.Collect(ctx, str)
+ if err != nil {
+ return fmt.Errorf("failed to import message: %w", err)
+ }
+
+ messageID = res[0].MessageID
+ }
+
+ var err error
+
+ if full, err = s.client.GetFullMessage(ctx, messageID, usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator()); err != nil {
+ return fmt.Errorf("failed to fetch message: %w", err)
+ }
+
+ if literal, err = message.BuildRFC822(addrKR, full.Message, full.AttData, defaultMessageJobOpts()); err != nil {
+ return fmt.Errorf("failed to build message: %w", err)
+ }
+
+ return nil
+ }); err != nil {
+ return imap.Message{}, nil, err
+ }
+
+ return toIMAPMessage(full.MessageMetadata), literal, nil
+}
+
+func (s *Connector) createDraft(ctx context.Context, literal []byte, addrKR *crypto.KeyRing, sender proton.Address) (proton.Message, error) {
+ // Create a new message parser from the reader.
+ parser, err := parser.New(bytes.NewReader(literal))
+ if err != nil {
+ return proton.Message{}, fmt.Errorf("failed to create parser: %w", err)
+ }
+
+ message, err := message.ParseWithParser(parser, true)
+ if err != nil {
+ return proton.Message{}, fmt.Errorf("failed to parse message: %w", err)
+ }
+
+ decBody := string(message.PlainBody)
+ if message.RichBody != "" {
+ decBody = string(message.RichBody)
+ }
+
+ draft, err := s.client.CreateDraft(ctx, addrKR, proton.CreateDraftReq{
+ Message: proton.DraftTemplate{
+ Subject: message.Subject,
+ Body: decBody,
+ MIMEType: message.MIMEType,
+
+ Sender: &mail.Address{Name: sender.DisplayName, Address: sender.Email},
+ ToList: message.ToList,
+ CCList: message.CCList,
+ BCCList: message.BCCList,
+
+ ExternalID: message.ExternalID,
+ },
+ })
+
+ if err != nil {
+ return proton.Message{}, fmt.Errorf("failed to create draft: %w", err)
+ }
+
+ for _, att := range message.Attachments {
+ disposition := proton.AttachmentDisposition
+ if att.Disposition == "inline" && att.ContentID != "" {
+ disposition = proton.InlineDisposition
+ }
+
+ if _, err := s.client.UploadAttachment(ctx, addrKR, proton.CreateAttachmentReq{
+ MessageID: draft.ID,
+ Filename: att.Name,
+ MIMEType: rfc822.MIMEType(att.MIMEType),
+ Disposition: disposition,
+ ContentID: att.ContentID,
+ Body: att.Data,
+ }); err != nil {
+ return proton.Message{}, fmt.Errorf("failed to add attachment to draft: %w", err)
+ }
+ }
+
+ return draft, nil
+}
+
+func (s *Connector) publishUpdate(_ context.Context, update imap.Update) {
+ s.updateCh.Enqueue(update)
+}
diff --git a/internal/services/imapservice/helpers.go b/internal/services/imapservice/helpers.go
new file mode 100644
index 00000000..af9e0b3e
--- /dev/null
+++ b/internal/services/imapservice/helpers.go
@@ -0,0 +1,191 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "fmt"
+ "net/mail"
+ "time"
+
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/gluon/rfc5322"
+ "github.com/ProtonMail/gluon/rfc822"
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/bradenaw/juniper/xslices"
+)
+
+func toIMAPMailbox(label proton.Label, flags, permFlags, attrs imap.FlagSet) imap.Mailbox {
+ if label.Type == proton.LabelTypeLabel {
+ label.Path = append([]string{labelPrefix}, label.Path...)
+ } else if label.Type == proton.LabelTypeFolder {
+ label.Path = append([]string{folderPrefix}, label.Path...)
+ }
+
+ return imap.Mailbox{
+ ID: imap.MailboxID(label.ID),
+ Name: label.Path,
+ Flags: flags,
+ PermanentFlags: permFlags,
+ Attributes: attrs,
+ }
+}
+
+func isAllMailOrScheduled(mailboxID imap.MailboxID) bool {
+ return (mailboxID == proton.AllMailLabel) || (mailboxID == proton.AllScheduledLabel)
+}
+
+func BuildFlagSetFromMessageMetadata(message proton.MessageMetadata) imap.FlagSet {
+ flags := imap.NewFlagSet()
+
+ if message.Seen() {
+ flags.AddToSelf(imap.FlagSeen)
+ }
+
+ if message.Starred() {
+ flags.AddToSelf(imap.FlagFlagged)
+ }
+
+ if message.IsDraft() {
+ flags.AddToSelf(imap.FlagDraft)
+ }
+
+ if message.IsRepliedAll == true || message.IsReplied == true { //nolint: gosimple
+ flags.AddToSelf(imap.FlagAnswered)
+ }
+
+ return flags
+}
+
+func getLiteralToList(literal []byte) ([]string, error) {
+ headerLiteral, _ := rfc822.Split(literal)
+
+ header, err := rfc822.NewHeader(headerLiteral)
+ if err != nil {
+ return nil, err
+ }
+
+ var result []string
+
+ parseAddress := func(field string) error {
+ if fieldAddr, ok := header.GetChecked(field); ok {
+ addr, err := rfc5322.ParseAddressList(fieldAddr)
+ if err != nil {
+ return fmt.Errorf("failed to parse addresses for '%v': %w", field, err)
+ }
+
+ result = append(result, xslices.Map(addr, func(addr *mail.Address) string {
+ return addr.Address
+ })...)
+
+ return nil
+ }
+
+ return nil
+ }
+
+ if err := parseAddress("To"); err != nil {
+ return nil, err
+ }
+
+ if err := parseAddress("Cc"); err != nil {
+ return nil, err
+ }
+
+ if err := parseAddress("Bcc"); err != nil {
+ return nil, err
+ }
+
+ return result, nil
+}
+
+func toIMAPMessage(message proton.MessageMetadata) imap.Message {
+ flags := BuildFlagSetFromMessageMetadata(message)
+
+ var date time.Time
+
+ if message.Time > 0 {
+ date = time.Unix(message.Time, 0)
+ } else {
+ date = time.Now()
+ }
+
+ return imap.Message{
+ ID: imap.MessageID(message.ID),
+ Flags: flags,
+ Date: date,
+ }
+}
+
+func WantLabel(label proton.Label) bool {
+ if label.Type != proton.LabelTypeSystem {
+ return true
+ }
+
+ // nolint:exhaustive
+ switch label.ID {
+ case proton.InboxLabel:
+ return true
+
+ case proton.TrashLabel:
+ return true
+
+ case proton.SpamLabel:
+ return true
+
+ case proton.AllMailLabel:
+ return true
+
+ case proton.ArchiveLabel:
+ return true
+
+ case proton.SentLabel:
+ return true
+
+ case proton.DraftsLabel:
+ return true
+
+ case proton.StarredLabel:
+ return true
+
+ case proton.AllScheduledLabel:
+ return true
+
+ default:
+ return false
+ }
+}
+
+func wantLabels(apiLabels map[string]proton.Label, labelIDs []string) []string {
+ return xslices.Filter(labelIDs, func(labelID string) bool {
+ apiLabel, ok := apiLabels[labelID]
+ if !ok {
+ return false
+ }
+
+ return WantLabel(apiLabel)
+ })
+}
+
+// sleepCtx sleeps for the given duration, or until the context is canceled.
+func sleepCtx(ctx context.Context, d time.Duration) {
+ select {
+ case <-ctx.Done():
+ case <-time.After(d):
+ }
+}
diff --git a/internal/services/imapservice/imap_updates.go b/internal/services/imapservice/imap_updates.go
new file mode 100644
index 00000000..c7f297bd
--- /dev/null
+++ b/internal/services/imapservice/imap_updates.go
@@ -0,0 +1,124 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "fmt"
+ "strings"
+
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/go-proton-api"
+)
+
+func newSystemMailboxCreatedUpdate(labelID imap.MailboxID, labelName string) *imap.MailboxCreated {
+ if strings.EqualFold(labelName, imap.Inbox) {
+ labelName = imap.Inbox
+ }
+
+ attrs := imap.NewFlagSet(imap.AttrNoInferiors)
+ permanentFlags := defaultPermanentFlags
+ flags := defaultFlags
+
+ switch labelID {
+ case proton.TrashLabel:
+ attrs = attrs.Add(imap.AttrTrash)
+
+ case proton.SpamLabel:
+ attrs = attrs.Add(imap.AttrJunk)
+
+ case proton.AllMailLabel:
+ attrs = attrs.Add(imap.AttrAll)
+ flags = imap.NewFlagSet(imap.FlagSeen, imap.FlagFlagged)
+ permanentFlags = imap.NewFlagSet(imap.FlagSeen, imap.FlagFlagged)
+
+ case proton.ArchiveLabel:
+ attrs = attrs.Add(imap.AttrArchive)
+
+ case proton.SentLabel:
+ attrs = attrs.Add(imap.AttrSent)
+
+ case proton.DraftsLabel:
+ attrs = attrs.Add(imap.AttrDrafts)
+
+ case proton.StarredLabel:
+ attrs = attrs.Add(imap.AttrFlagged)
+
+ case proton.AllScheduledLabel:
+ labelName = "Scheduled" // API actual name is "All Scheduled"
+ }
+
+ return imap.NewMailboxCreated(imap.Mailbox{
+ ID: labelID,
+ Name: []string{labelName},
+ Flags: flags,
+ PermanentFlags: permanentFlags,
+ Attributes: attrs,
+ })
+}
+
+func waitOnIMAPUpdates(ctx context.Context, updates []imap.Update) error {
+ for _, update := range updates {
+ if err, ok := update.WaitContext(ctx); ok && err != nil {
+ return fmt.Errorf("failed to apply gluon update %v: %w", update.String(), err)
+ }
+ }
+
+ return nil
+}
+
+func newPlaceHolderMailboxCreatedUpdate(labelName string) *imap.MailboxCreated {
+ return imap.NewMailboxCreated(imap.Mailbox{
+ ID: imap.MailboxID(labelName),
+ Name: []string{labelName},
+ Flags: defaultFlags,
+ PermanentFlags: defaultPermanentFlags,
+ Attributes: imap.NewFlagSet(imap.AttrNoSelect),
+ })
+}
+
+func newMailboxCreatedUpdate(labelID imap.MailboxID, labelName []string) *imap.MailboxCreated {
+ return imap.NewMailboxCreated(imap.Mailbox{
+ ID: labelID,
+ Name: labelName,
+ Flags: defaultFlags,
+ PermanentFlags: defaultPermanentFlags,
+ Attributes: imap.NewFlagSet(),
+ })
+}
+
+func GetMailboxName(label proton.Label) []string {
+ var name []string
+
+ switch label.Type {
+ case proton.LabelTypeFolder:
+ name = append([]string{folderPrefix}, label.Path...)
+
+ case proton.LabelTypeLabel:
+ name = append([]string{labelPrefix}, label.Path...)
+
+ case proton.LabelTypeContactGroup:
+ fallthrough
+ case proton.LabelTypeSystem:
+ fallthrough
+ default:
+ name = label.Path
+ }
+
+ return name
+}
diff --git a/internal/services/imapservice/server_manager.go b/internal/services/imapservice/server_manager.go
new file mode 100644
index 00000000..2085f778
--- /dev/null
+++ b/internal/services/imapservice/server_manager.go
@@ -0,0 +1,61 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+
+ "github.com/ProtonMail/gluon/connector"
+)
+
+type IMAPServerManager interface {
+ AddIMAPUser(
+ ctx context.Context,
+ connector connector.Connector,
+ addrID string,
+ idProvider GluonIDProvider,
+ syncStateProvider SyncStateProvider,
+ ) error
+
+ RemoveIMAPUser(ctx context.Context, deleteData bool, provider GluonIDProvider, addrID ...string) error
+}
+
+type NullIMAPServerManager struct{}
+
+func (n NullIMAPServerManager) AddIMAPUser(
+ _ context.Context,
+ _ connector.Connector,
+ _ string,
+ _ GluonIDProvider,
+ _ SyncStateProvider,
+) error {
+ return nil
+}
+
+func (n NullIMAPServerManager) RemoveIMAPUser(
+ _ context.Context,
+ _ bool,
+ _ GluonIDProvider,
+ _ ...string,
+) error {
+ return nil
+}
+
+func NewNullIMAPServerManager() *NullIMAPServerManager {
+ return &NullIMAPServerManager{}
+}
diff --git a/internal/services/imapservice/service.go b/internal/services/imapservice/service.go
new file mode 100644
index 00000000..8a55bfa8
--- /dev/null
+++ b/internal/services/imapservice/service.go
@@ -0,0 +1,591 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/ProtonMail/gluon/async"
+ "github.com/ProtonMail/gluon/reporter"
+ "github.com/ProtonMail/gluon/watcher"
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/proton-bridge/v3/internal/events"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity"
+ "github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
+ "github.com/ProtonMail/proton-bridge/v3/internal/vault"
+ "github.com/ProtonMail/proton-bridge/v3/pkg/cpc"
+ "github.com/sirupsen/logrus"
+)
+
+type EventProvider interface {
+ userevents.Subscribable
+ userevents.EventController
+}
+
+type Telemetry interface {
+ useridentity.Telemetry
+ SendConfigStatusSuccess(ctx context.Context)
+}
+
+type GluonIDProvider interface {
+ GetGluonID(addrID string) (string, bool)
+ GetGluonIDs() map[string]string
+ SetGluonID(addrID, gluonID string) error
+ RemoveGluonID(addrID, gluonID string) error
+ GluonKey() []byte
+}
+
+type SyncStateProvider interface {
+ AddFailedMessageID(messageID string) error
+ RemFailedMessageID(messageID string) error
+ GetSyncStatus() vault.SyncStatus
+ ClearSyncStatus() error
+ SetHasLabels(bool) error
+ SetHasMessages(bool) error
+ SetLastMessageID(messageID string) error
+}
+
+type Service struct {
+ log *logrus.Entry
+ cpc *cpc.CPC
+
+ client APIClient
+ identityState *rwIdentity
+ labels *rwLabels
+ addressMode usertypes.AddressMode
+
+ refreshSubscriber *userevents.RefreshChanneledSubscriber
+ addressSubscriber *userevents.AddressChanneledSubscriber
+ userSubscriber *userevents.UserChanneledSubscriber
+ messageSubscriber *userevents.MessageChanneledSubscriber
+ labelSubscriber *userevents.LabelChanneledSubscriber
+
+ gluonIDProvider GluonIDProvider
+ syncStateProvider SyncStateProvider
+ eventProvider EventProvider
+ serverManager IMAPServerManager
+ eventPublisher events.EventPublisher
+
+ telemetry Telemetry
+ panicHandler async.PanicHandler
+ sendRecorder *sendrecorder.SendRecorder
+ reporter reporter.Reporter
+
+ eventSubscription events.Subscription
+ eventWatcher *watcher.Watcher[events.Event]
+ connectors map[string]*Connector
+ maxSyncMemory uint64
+ showAllMail bool
+}
+
+func NewService(
+ client APIClient,
+ identityState *useridentity.State,
+ gluonIDProvider GluonIDProvider,
+ syncStateProvider SyncStateProvider,
+ eventProvider EventProvider,
+ serverManager IMAPServerManager,
+ eventPublisher events.EventPublisher,
+ bridgePassProvider useridentity.BridgePassProvider,
+ keyPassProvider useridentity.KeyPassProvider,
+ panicHandler async.PanicHandler,
+ sendRecorder *sendrecorder.SendRecorder,
+ telemetry Telemetry,
+ reporter reporter.Reporter,
+ addressMode usertypes.AddressMode,
+ subscription events.Subscription,
+ maxSyncMemory uint64,
+ showAllMail bool,
+) *Service {
+ subscriberName := fmt.Sprintf("imap-%v", identityState.User.ID)
+
+ return &Service{
+ cpc: cpc.NewCPC(),
+ log: logrus.WithFields(logrus.Fields{
+ "user": identityState.User.ID,
+ "service": "imap",
+ }),
+ client: client,
+ identityState: newRWIdentity(identityState, bridgePassProvider, keyPassProvider),
+ labels: newRWLabels(),
+ addressMode: addressMode,
+
+ gluonIDProvider: gluonIDProvider,
+ serverManager: serverManager,
+ syncStateProvider: syncStateProvider,
+ eventProvider: eventProvider,
+ eventPublisher: eventPublisher,
+
+ refreshSubscriber: userevents.NewRefreshSubscriber(subscriberName),
+ addressSubscriber: userevents.NewAddressSubscriber(subscriberName),
+ userSubscriber: userevents.NewUserSubscriber(subscriberName),
+ messageSubscriber: userevents.NewMessageSubscriber(subscriberName),
+ labelSubscriber: userevents.NewLabelSubscriber(subscriberName),
+
+ panicHandler: panicHandler,
+ sendRecorder: sendRecorder,
+ telemetry: telemetry,
+ reporter: reporter,
+
+ connectors: make(map[string]*Connector),
+ maxSyncMemory: maxSyncMemory,
+
+ eventWatcher: subscription.Add(events.IMAPServerCreated{}),
+ eventSubscription: subscription,
+ showAllMail: showAllMail,
+ }
+}
+
+func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) error {
+ // Get user labels
+ apiLabels, err := s.client.GetLabels(ctx, proton.LabelTypeSystem, proton.LabelTypeFolder, proton.LabelTypeLabel)
+ if err != nil {
+ return fmt.Errorf("failed to get labels: %w", err)
+ }
+
+ s.labels.SetLabels(apiLabels)
+
+ {
+ connectors, err := s.buildConnectors()
+ if err != nil {
+ s.log.WithError(err).Error("Failed to build connectors")
+ return err
+ }
+ s.connectors = connectors
+ }
+
+ if err := s.addConnectorsToServer(ctx, s.connectors); err != nil {
+ return err
+ }
+
+ group.Go(ctx, s.identityState.identity.User.ID, "imap-service", s.run)
+ return nil
+}
+
+func (s *Service) SetAddressMode(ctx context.Context, mode usertypes.AddressMode) error {
+ _, err := s.cpc.Send(ctx, &setAddressModeReq{mode: mode})
+
+ return err
+}
+
+func (s *Service) Resync(ctx context.Context) error {
+ _, err := s.cpc.Send(ctx, &resyncReq{})
+
+ return err
+}
+
+func (s *Service) CancelSync(ctx context.Context) error {
+ _, err := s.cpc.Send(ctx, &cancelSyncReq{})
+
+ return err
+}
+
+func (s *Service) ResumeSync(ctx context.Context) error {
+ _, err := s.cpc.Send(ctx, &cancelSyncReq{})
+
+ return err
+}
+
+func (s *Service) OnBadEvent(ctx context.Context) error {
+ _, err := s.cpc.Send(ctx, &onBadEventReq{})
+
+ return err
+}
+
+func (s *Service) OnBadEventResync(ctx context.Context) error {
+ _, err := s.cpc.Send(ctx, &onBadEventResyncReq{})
+
+ return err
+}
+
+func (s *Service) OnLogout(ctx context.Context) error {
+ _, err := s.cpc.Send(ctx, &onLogoutReq{})
+
+ return err
+}
+
+func (s *Service) ShowAllMail(ctx context.Context, v bool) error {
+ _, err := s.cpc.Send(ctx, &showAllMailReq{v: v})
+
+ return err
+}
+
+func (s *Service) GetLabels(ctx context.Context) (map[string]proton.Label, error) {
+ return cpc.SendTyped[map[string]proton.Label](ctx, s.cpc, &getLabelsReq{})
+}
+
+func (s *Service) Close() {
+ for _, c := range s.connectors {
+ c.StateClose()
+ }
+
+ s.connectors = make(map[string]*Connector)
+}
+
+func (s *Service) run(ctx context.Context) { //nolint gocyclo
+ s.log.Info("Starting IMAP Service")
+ defer s.log.Info("Exiting IMAP Service")
+
+ defer s.cpc.Close()
+ defer s.eventSubscription.Remove(s.eventWatcher)
+
+ syncHandler := newSyncHandler(ctx, s.panicHandler)
+ defer syncHandler.Close()
+
+ syncHandler.launch(s)
+
+ subscription := userevents.Subscription{
+ User: s.userSubscriber,
+ Refresh: s.refreshSubscriber,
+ Address: s.addressSubscriber,
+ Labels: s.labelSubscriber,
+ Messages: s.messageSubscriber,
+ }
+
+ s.eventProvider.Subscribe(subscription)
+ defer s.eventProvider.Unsubscribe(subscription)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+
+ case req, ok := <-s.cpc.ReceiveCh():
+ if !ok {
+ continue
+ }
+ switch r := req.Value().(type) {
+ case *setAddressModeReq:
+ err := s.setAddressMode(ctx, syncHandler, r.mode)
+ req.Reply(ctx, nil, err)
+
+ case *resyncReq:
+ s.log.Info("Received resync request, handling as refresh event")
+ err := s.onRefreshEvent(ctx, syncHandler)
+ req.Reply(ctx, nil, err)
+ s.log.Info("Resync reply sent, handling as refresh event")
+
+ case *cancelSyncReq:
+ s.log.Info("Cancelling sync")
+ syncHandler.Cancel()
+ req.Reply(ctx, nil, nil)
+
+ case *resumeSyncReq:
+ s.log.Info("Resuming sync")
+ // Cancel previous run, if any, just in case.
+ syncHandler.CancelAndWait()
+ syncHandler.launch(s)
+ req.Reply(ctx, nil, nil)
+ case *getLabelsReq:
+ labels := s.labels.GetLabelMap()
+ req.Reply(ctx, labels, nil)
+
+ case *onBadEventReq:
+ err := s.removeConnectorsFromServer(ctx, s.connectors, false)
+ req.Reply(ctx, nil, err)
+
+ case *onBadEventResyncReq:
+ err := s.addConnectorsToServer(ctx, s.connectors)
+ req.Reply(ctx, nil, err)
+
+ case *onLogoutReq:
+ err := s.removeConnectorsFromServer(ctx, s.connectors, false)
+ req.Reply(ctx, nil, err)
+
+ case *showAllMailReq:
+ req.Reply(ctx, nil, nil)
+ s.setShowAllMail(r.v)
+
+ default:
+ s.log.Error("Received unknown request")
+ }
+
+ case err, ok := <-syncHandler.OnSyncFinishedCH():
+ {
+ if !ok {
+ continue
+ }
+
+ if err != nil {
+ s.log.WithError(err).Error("Sync failed")
+ continue
+ }
+
+ s.log.Info("Sync complete, starting API event stream")
+ s.eventProvider.Resume()
+ }
+
+ case update, ok := <-syncHandler.updater.ch:
+ if !ok {
+ continue
+ }
+ s.onSyncUpdate(ctx, update)
+
+ case e, ok := <-s.userSubscriber.OnEventCh():
+ if !ok {
+ continue
+ }
+ e.Consume(func(user proton.User) error {
+ return s.onUserEvent(user)
+ })
+ case e, ok := <-s.addressSubscriber.OnEventCh():
+ if !ok {
+ continue
+ }
+ e.Consume(func(events []proton.AddressEvent) error {
+ return s.onAddressEvent(ctx, events)
+ })
+ case e, ok := <-s.labelSubscriber.OnEventCh():
+ if !ok {
+ continue
+ }
+ e.Consume(func(events []proton.LabelEvent) error {
+ return s.onLabelEvent(ctx, events)
+ })
+ case e, ok := <-s.messageSubscriber.OnEventCh():
+ if !ok {
+ continue
+ }
+ e.Consume(func(events []proton.MessageEvent) error {
+ return s.onMessageEvent(ctx, events)
+ })
+ case e, ok := <-s.refreshSubscriber.OnEventCh():
+ if !ok {
+ continue
+ }
+ e.Consume(func(_ proton.RefreshFlag) error {
+ return s.onRefreshEvent(ctx, syncHandler)
+ })
+ case e, ok := <-s.eventWatcher.GetChannel():
+ if !ok {
+ continue
+ }
+
+ if _, ok := e.(events.IMAPServerCreated); ok {
+ if err := s.addConnectorsToServer(ctx, s.connectors); err != nil {
+ s.log.WithError(err).Error("Failed to add connector to server after created")
+ }
+ }
+ }
+ }
+}
+
+func (s *Service) onRefreshEvent(ctx context.Context, handler *syncHandler) error {
+ s.log.Debug("handling refresh event")
+
+ if err := s.identityState.Write(func(identity *useridentity.State) error {
+ return identity.OnRefreshEvent(ctx)
+ }); err != nil {
+ s.log.WithError(err).Error("Failed to apply refresh event to identity state")
+ return err
+ }
+
+ handler.CancelAndWait()
+
+ if err := s.removeConnectorsFromServer(ctx, s.connectors, true); err != nil {
+ return err
+ }
+
+ if err := s.syncStateProvider.ClearSyncStatus(); err != nil {
+ return fmt.Errorf("failed to clear sync status:%w", err)
+ }
+
+ if err := s.addConnectorsToServer(ctx, s.connectors); err != nil {
+ return err
+ }
+
+ handler.launch(s)
+
+ return nil
+}
+
+func (s *Service) onUserEvent(user proton.User) error {
+ s.log.Debug("handling user event")
+ return s.identityState.Write(func(identity *useridentity.State) error {
+ identity.OnUserEvent(user)
+ return nil
+ })
+}
+
+func (s *Service) buildConnectors() (map[string]*Connector, error) {
+ connectors := make(map[string]*Connector)
+
+ if s.addressMode == usertypes.AddressModeCombined {
+ addr, err := s.identityState.GetPrimaryAddress()
+ if err != nil {
+ return nil, fmt.Errorf("failed to build connector for combined mode: %w", err)
+ }
+
+ connectors[addr.ID] = NewConnector(
+ addr.ID,
+ s.client,
+ s.labels,
+ s.identityState,
+ s.addressMode,
+ s.sendRecorder,
+ s.panicHandler,
+ s.telemetry,
+ s.showAllMail,
+ )
+
+ return connectors, nil
+ }
+
+ for _, addr := range s.identityState.GetAddresses() {
+ connectors[addr.ID] = NewConnector(
+ addr.ID,
+ s.client,
+ s.labels,
+ s.identityState,
+ s.addressMode,
+ s.sendRecorder,
+ s.panicHandler,
+ s.telemetry,
+ s.showAllMail,
+ )
+ }
+
+ return connectors, nil
+}
+
+func (s *Service) rebuildConnectors() error {
+ newConnectors, err := s.buildConnectors()
+ if err != nil {
+ return err
+ }
+
+ for _, c := range s.connectors {
+ c.StateClose()
+ }
+
+ s.connectors = newConnectors
+
+ return nil
+}
+
+func (s *Service) onSyncUpdate(ctx context.Context, syncUpdate syncUpdate) {
+ c, ok := s.connectors[syncUpdate.addrID]
+ if !ok {
+ s.log.Warningf("Received syncUpdate for unknown addr (%v), connector may have been removed", syncUpdate.addrID)
+ syncUpdate.update.Done(fmt.Errorf("undeliverable"))
+ return
+ }
+
+ c.publishUpdate(ctx, syncUpdate.update)
+}
+
+func (s *Service) addConnectorsToServer(ctx context.Context, connectors map[string]*Connector) error {
+ addedConnectors := make([]string, 0, len(connectors))
+ for _, c := range connectors {
+ if err := s.serverManager.AddIMAPUser(ctx, c, c.addrID, s.gluonIDProvider, s.syncStateProvider); err != nil {
+ s.log.WithError(err).Error("Failed to add connect to imap server")
+
+ if err := s.serverManager.RemoveIMAPUser(ctx, false, s.gluonIDProvider, addedConnectors...); err != nil {
+ s.log.WithError(err).Error("Failed to remove previously added connectors after failure")
+ }
+ }
+ addedConnectors = append(addedConnectors, c.addrID)
+ }
+
+ return nil
+}
+
+func (s *Service) removeConnectorsFromServer(ctx context.Context, connectors map[string]*Connector, deleteData bool) error {
+ addrIDs := make([]string, 0, len(connectors))
+
+ for _, c := range connectors {
+ addrIDs = append(addrIDs, c.addrID)
+ }
+
+ if err := s.serverManager.RemoveIMAPUser(ctx, deleteData, s.gluonIDProvider, addrIDs...); err != nil {
+ return fmt.Errorf("failed to remove gluon users from server: %w", err)
+ }
+
+ return nil
+}
+
+func (s *Service) setAddressMode(ctx context.Context, handler *syncHandler, mode usertypes.AddressMode) error {
+ if s.addressMode == mode {
+ return nil
+ }
+
+ s.addressMode = mode
+ if mode == usertypes.AddressModeSplit {
+ s.log.Info("Setting Split Address Mode")
+ } else {
+ s.log.Info("Setting Combined Address Mode")
+ }
+
+ handler.CancelAndWait()
+
+ if err := s.removeConnectorsFromServer(ctx, s.connectors, true); err != nil {
+ return err
+ }
+
+ if err := s.syncStateProvider.ClearSyncStatus(); err != nil {
+ return fmt.Errorf("failed to clear sync status:%w", err)
+ }
+
+ if err := s.rebuildConnectors(); err != nil {
+ return fmt.Errorf("failed to rebuild connectors: %w", err)
+ }
+
+ if err := s.addConnectorsToServer(ctx, s.connectors); err != nil {
+ return err
+ }
+
+ handler.launch(s)
+
+ return nil
+}
+
+func (s *Service) setShowAllMail(v bool) {
+ if s.showAllMail == v {
+ return
+ }
+
+ s.showAllMail = v
+
+ for _, c := range s.connectors {
+ c.ShowAllMail(v)
+ }
+}
+
+type resyncReq struct{}
+
+type cancelSyncReq struct{}
+
+type resumeSyncReq struct{}
+
+type getLabelsReq struct{}
+
+type onBadEventReq struct{}
+
+type onBadEventResyncReq struct{}
+
+type onLogoutReq struct{}
+
+type showAllMailReq struct{ v bool }
+
+type setAddressModeReq struct {
+ mode usertypes.AddressMode
+}
diff --git a/internal/services/imapservice/service_address_events.go b/internal/services/imapservice/service_address_events.go
new file mode 100644
index 00000000..71ca5c85
--- /dev/null
+++ b/internal/services/imapservice/service_address_events.go
@@ -0,0 +1,166 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity"
+ "github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
+)
+
+func (s *Service) onAddressEvent(ctx context.Context, events []proton.AddressEvent) error {
+ s.log.Debug("handling address event")
+
+ if s.addressMode == usertypes.AddressModeCombined {
+ if err := s.identityState.Write(func(identity *useridentity.State) error {
+ identity.OnAddressEvents(events)
+ return nil
+ }); err != nil {
+ s.log.WithError(err).Error("Failed to apply address events to identity state")
+ return err
+ }
+
+ return nil
+ }
+
+ if s.addressMode != usertypes.AddressModeSplit {
+ return nil
+ }
+
+ for _, event := range events {
+ switch event.Action {
+ case proton.EventCreate:
+ var status useridentity.AddressUpdate
+
+ if err := s.identityState.Write(func(identity *useridentity.State) error {
+ status = identity.OnAddressCreated(event)
+
+ return nil
+ }); err != nil {
+ return fmt.Errorf("failed to update identity state: %w", err)
+ }
+
+ if status == useridentity.AddressUpdateCreated {
+ if err := addNewAddressSplitMode(ctx, s, event.Address.ID); err != nil {
+ return err
+ }
+ }
+
+ case proton.EventUpdateFlags, proton.EventUpdate:
+ var addr proton.Address
+ var status useridentity.AddressUpdate
+
+ if err := s.identityState.Write(func(identity *useridentity.State) error {
+ addr, status = identity.OnAddressUpdated(event)
+
+ return nil
+ }); err != nil {
+ return fmt.Errorf("failed to update identity state: %w", err)
+ }
+
+ // nolint:exhaustive
+ switch status {
+ case useridentity.AddressUpdateCreated:
+ if err := addNewAddressSplitMode(ctx, s, addr.ID); err != nil {
+ return err
+ }
+
+ case useridentity.AddressUpdateDisabled:
+ if err := removeAddressSplitMode(ctx, s, addr.ID); err != nil {
+ return err
+ }
+
+ case useridentity.AddressUpdateEnabled:
+ if err := addNewAddressSplitMode(ctx, s, addr.ID); err != nil {
+ return err
+ }
+
+ default:
+ continue
+ }
+
+ case proton.EventDelete:
+ var status useridentity.AddressUpdate
+
+ if err := s.identityState.Write(func(identity *useridentity.State) error {
+ _, status = identity.OnAddressDeleted(event)
+
+ return nil
+ }); err != nil {
+ return fmt.Errorf("failed to update identity state: %w", err)
+ }
+ if status == useridentity.AddressUpdateDeleted {
+ if err := removeAddressSplitMode(ctx, s, event.ID); err != nil {
+ return err
+ }
+ }
+
+ default:
+ return fmt.Errorf("unknown event action: %v", event.Action)
+ }
+ }
+
+ return nil
+}
+
+func addNewAddressSplitMode(ctx context.Context, s *Service, addrID string) error {
+ connector := NewConnector(
+ addrID,
+ s.client,
+ s.labels,
+ s.identityState,
+ s.addressMode,
+ s.sendRecorder,
+ s.panicHandler,
+ s.telemetry,
+ s.showAllMail,
+ )
+
+ if err := s.serverManager.AddIMAPUser(ctx, connector, connector.addrID, s.gluonIDProvider, s.syncStateProvider); err != nil {
+ return fmt.Errorf("failed to add new account to server: %w", err)
+ }
+
+ s.connectors[connector.addrID] = connector
+
+ if err := syncLabels(ctx, s.labels.GetLabelMap(), connector); err != nil {
+ return fmt.Errorf("failed to sync labels for new address: %w", err)
+ }
+
+ return nil
+}
+
+func removeAddressSplitMode(ctx context.Context, s *Service, addrID string) error {
+ connector, ok := s.connectors[addrID]
+ if !ok {
+ s.log.Warnf("Could not find connector ")
+ return nil
+ }
+
+ if err := s.serverManager.RemoveIMAPUser(ctx, true, s.gluonIDProvider, addrID); err != nil {
+ return fmt.Errorf("failed to remove user from server: %w", err)
+ }
+
+ connector.StateClose()
+
+ delete(s.connectors, addrID)
+
+ return nil
+}
diff --git a/internal/services/imapservice/service_label_events.go b/internal/services/imapservice/service_label_events.go
new file mode 100644
index 00000000..b9ed2889
--- /dev/null
+++ b/internal/services/imapservice/service_label_events.go
@@ -0,0 +1,179 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net/http"
+
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/proton-bridge/v3/internal/events"
+ "github.com/ProtonMail/proton-bridge/v3/internal/logging"
+ "github.com/bradenaw/juniper/xslices"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/exp/maps"
+)
+
+func (s *Service) onLabelEvent(ctx context.Context, events []proton.LabelEvent) error {
+ s.log.Debug("handling label event")
+
+ for _, event := range events {
+ switch event.Action {
+ case proton.EventCreate:
+ updates := onLabelCreated(ctx, s, event)
+
+ if err := waitOnIMAPUpdates(ctx, updates); err != nil {
+ return err
+ }
+
+ case proton.EventUpdateFlags, proton.EventUpdate:
+ updates, err := onLabelUpdated(ctx, s, event)
+ if err != nil {
+ return fmt.Errorf("failed to handle update label event: %w", err)
+ }
+
+ if err := waitOnIMAPUpdates(ctx, updates); err != nil {
+ return err
+ }
+
+ case proton.EventDelete:
+ updates := onLabelDeleted(ctx, s, event)
+
+ if err := waitOnIMAPUpdates(ctx, updates); err != nil {
+ return fmt.Errorf("failed to handle delete label event in gluon: %w", err)
+ }
+ }
+ }
+
+ return nil
+}
+
+func onLabelCreated(ctx context.Context, s *Service, event proton.LabelEvent) []imap.Update {
+ updates := make([]imap.Update, 0, len(s.connectors))
+
+ s.log.WithFields(logrus.Fields{
+ "labelID": event.ID,
+ "name": logging.Sensitive(event.Label.Name),
+ }).Info("Handling label created event")
+
+ wr := s.labels.Write()
+ defer wr.Close()
+
+ wr.SetLabel(event.Label.ID, event.Label)
+
+ for _, updateCh := range maps.Values(s.connectors) {
+ update := newMailboxCreatedUpdate(imap.MailboxID(event.ID), GetMailboxName(event.Label))
+ updateCh.publishUpdate(ctx, update)
+ updates = append(updates, update)
+ }
+
+ s.eventPublisher.PublishEvent(ctx, events.UserLabelCreated{
+ UserID: s.identityState.UserID(),
+ LabelID: event.Label.ID,
+ Name: event.Label.Name,
+ })
+
+ return updates
+}
+
+func onLabelUpdated(ctx context.Context, s *Service, event proton.LabelEvent) ([]imap.Update, error) {
+ var updates []imap.Update
+
+ s.log.WithFields(logrus.Fields{
+ "labelID": event.ID,
+ "name": logging.Sensitive(event.Label.Name),
+ }).Info("Handling label updated event")
+
+ stack := []proton.Label{event.Label}
+
+ wr := s.labels.Write()
+ defer wr.Close()
+
+ for len(stack) > 0 {
+ label := stack[0]
+ stack = stack[1:]
+
+ // Only update the label if it exists; we don't want to create it as a client may have just deleted it.
+ if _, ok := wr.GetLabel(label.ID); ok {
+ wr.SetLabel(label.ID, event.Label)
+ }
+
+ // API doesn't notify us that the path has changed. We need to fetch it again.
+ apiLabel, err := s.client.GetLabel(ctx, label.ID, label.Type)
+ if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status == http.StatusUnprocessableEntity {
+ s.log.WithError(apiErr).Warn("Failed to get label: label does not exist")
+ continue
+ } else if err != nil {
+ return nil, fmt.Errorf("failed to get label %q: %w", label.ID, err)
+ }
+
+ // Update the label in the map.
+ wr.SetLabel(apiLabel.ID, apiLabel)
+
+ // Notify the IMAP clients.
+ for _, updateCh := range maps.Values(s.connectors) {
+ update := imap.NewMailboxUpdated(
+ imap.MailboxID(apiLabel.ID),
+ GetMailboxName(apiLabel),
+ )
+ updateCh.publishUpdate(ctx, update)
+ updates = append(updates, update)
+ }
+
+ s.eventPublisher.PublishEvent(ctx, events.UserLabelUpdated{
+ UserID: s.identityState.UserID(),
+ LabelID: apiLabel.ID,
+ Name: apiLabel.Name,
+ })
+
+ children := xslices.Filter(wr.GetLabels(), func(other proton.Label) bool {
+ return other.ParentID == label.ID
+ })
+
+ stack = append(stack, children...)
+ }
+
+ return updates, nil
+}
+
+func onLabelDeleted(ctx context.Context, s *Service, event proton.LabelEvent) []imap.Update {
+ updates := make([]imap.Update, 0, len(s.connectors))
+
+ s.log.WithField("labelID", event.ID).Info("Handling label deleted event")
+
+ for _, updateCh := range maps.Values(s.connectors) {
+ update := imap.NewMailboxDeleted(imap.MailboxID(event.ID))
+ updateCh.publishUpdate(ctx, update)
+ updates = append(updates, update)
+ }
+
+ wr := s.labels.Write()
+ wr.Close()
+
+ wr.Delete(event.ID)
+
+ s.eventPublisher.PublishEvent(ctx, events.UserLabelDeleted{
+ UserID: s.identityState.UserID(),
+ LabelID: event.ID,
+ })
+
+ return updates
+}
diff --git a/internal/services/imapservice/service_message_events.go b/internal/services/imapservice/service_message_events.go
new file mode 100644
index 00000000..9257ffe5
--- /dev/null
+++ b/internal/services/imapservice/service_message_events.go
@@ -0,0 +1,346 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "net/http"
+
+ "github.com/ProtonMail/gluon"
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/gluon/reporter"
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/gopenpgp/v2/crypto"
+ "github.com/ProtonMail/proton-bridge/v3/internal"
+ "github.com/ProtonMail/proton-bridge/v3/internal/logging"
+ "github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/exp/maps"
+)
+
+func (s *Service) onMessageEvent(ctx context.Context, events []proton.MessageEvent) error {
+ s.log.Debug("handling message event")
+
+ for _, event := range events {
+ ctx = logging.WithLogrusField(ctx, "messageID", event.ID)
+
+ switch event.Action {
+ case proton.EventCreate:
+ updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message)
+ if err != nil {
+ reportError(s.reporter, s.log, "Failed to apply create message event", err)
+ return fmt.Errorf("failed to handle create message event: %w", err)
+ }
+
+ if err := waitOnIMAPUpdates(ctx, updates); err != nil {
+ return err
+ }
+
+ case proton.EventUpdate, proton.EventUpdateFlags:
+ // Draft update means to completely remove old message and upload the new data again, but we should
+ // only do this if the event is of type EventUpdate otherwise label switch operations will not work.
+ if (event.Message.IsDraft() || (event.Message.Flags&proton.MessageFlagSent != 0)) && event.Action == proton.EventUpdate {
+ updates, err := onMessageUpdateDraftOrSent(
+ logging.WithLogrusField(ctx, "action", "update draft or sent message"),
+ s,
+ event,
+ )
+ if err != nil {
+ reportError(s.reporter, s.log, "Failed to apply update draft message event", err)
+ return fmt.Errorf("failed to handle update draft event: %w", err)
+ }
+
+ if err := waitOnIMAPUpdates(ctx, updates); err != nil {
+ return err
+ }
+
+ continue
+ }
+
+ // GODT-2028 - Use better events here. It should be possible to have 3 separate events that refrain to
+ // whether the flags, labels or read only data (header+body) has been changed. This requires fixing proton
+ // first so that it correctly reports those cases.
+ // Issue regular update to handle mailboxes and flag changes.
+ updates, err := onMessageUpdate(
+ logging.WithLogrusField(ctx, "action", "update message"),
+ s,
+ event.Message,
+ )
+ if err != nil {
+ reportError(s.reporter, s.log, "Failed to apply update message event", err)
+ return fmt.Errorf("failed to handle update message event: %w", err)
+ }
+
+ // If the update fails on the gluon side because it doesn't exist, we try to create the message instead.
+ if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) {
+ s.log.WithError(err).Error("Failed to handle update message event in gluon, will try creating it")
+
+ updates, err := onMessageCreated(ctx, s, event.Message)
+ if err != nil {
+ return fmt.Errorf("failed to handle update message event as create: %w", err)
+ }
+
+ if err := waitOnIMAPUpdates(ctx, updates); err != nil {
+ return err
+ }
+ } else if err != nil {
+ return err
+ }
+
+ case proton.EventDelete:
+ updates := onMessageDeleted(
+ logging.WithLogrusField(ctx, "action", "delete message"),
+ s,
+ event,
+ )
+
+ if err := waitOnIMAPUpdates(ctx, updates); err != nil {
+ return fmt.Errorf("failed to handle delete message event in gluon: %w", err)
+ }
+ }
+ }
+
+ return nil
+}
+
+func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMetadata) ([]imap.Update, error) {
+ s.log.WithFields(logrus.Fields{
+ "messageID": message.ID,
+ "subject": logging.Sensitive(message.Subject),
+ }).Info("Handling message created event")
+
+ full, err := s.client.GetFullMessage(ctx, message.ID, usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator())
+ if err != nil {
+ // If the message is not found, it means that it has been deleted before we could fetch it.
+ if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status == http.StatusUnprocessableEntity {
+ s.log.WithField("messageID", message.ID).Warn("Cannot create new message: full message is missing on API")
+ return nil, nil
+ }
+
+ return nil, fmt.Errorf("failed to get full message: %w", err)
+ }
+
+ var update imap.Update
+
+ apiLabels := s.labels.GetLabelMap()
+
+ if err := s.identityState.WithAddrKR(message.AddressID, func(_, addrKR *crypto.KeyRing) error {
+ res := buildRFC822(apiLabels, full, addrKR, new(bytes.Buffer))
+
+ if res.err != nil {
+ s.log.WithError(err).Error("Failed to build RFC822 message")
+
+ if err := s.syncStateProvider.AddFailedMessageID(message.ID); err != nil {
+ s.log.WithError(err).Error("Failed to add failed message ID to vault")
+ }
+
+ reportErrorAndMessageID(s.reporter, s.log, "Failed to build message (event create)", res.err, res.messageID)
+
+ return nil
+ }
+
+ if err := s.syncStateProvider.RemFailedMessageID(message.ID); err != nil {
+ s.log.WithError(err).Error("Failed to remove failed message ID from vault")
+ }
+
+ update = imap.NewMessagesCreated(false, res.update)
+ didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update)
+ if err != nil {
+ return err
+ }
+
+ if !didPublish {
+ update = nil
+ }
+
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+
+ if update == nil {
+ return nil, nil
+ }
+
+ return []imap.Update{update}, nil
+}
+
+func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.MessageEvent) ([]imap.Update, error) {
+ s.log.WithFields(logrus.Fields{
+ "messageID": event.ID,
+ "subject": logging.Sensitive(event.Message.Subject),
+ "isDraft": event.Message.IsDraft(),
+ }).Info("Handling draft or sent updated event")
+
+ full, err := s.client.GetFullMessage(ctx, event.Message.ID, usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator())
+ if err != nil {
+ // If the message is not found, it means that it has been deleted before we could fetch it.
+ if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status == http.StatusUnprocessableEntity {
+ s.log.WithField("messageID", event.Message.ID).Warn("Cannot update message: full message is missing on API")
+ return nil, nil
+ }
+
+ return nil, fmt.Errorf("failed to get full draft: %w", err)
+ }
+
+ var update imap.Update
+
+ apiLabels := s.labels.GetLabelMap()
+
+ if err := s.identityState.WithAddrKR(event.Message.AddressID, func(_, addrKR *crypto.KeyRing) error {
+ res := buildRFC822(apiLabels, full, addrKR, new(bytes.Buffer))
+
+ if res.err != nil {
+ logrus.WithError(err).Error("Failed to build RFC822 message")
+
+ if err := s.syncStateProvider.AddFailedMessageID(event.ID); err != nil {
+ s.log.WithError(err).Error("Failed to add failed message ID to vault")
+ }
+
+ reportErrorAndMessageID(s.reporter, s.log, "Failed to build draft message (event update)", res.err, res.messageID)
+
+ return nil
+ }
+
+ if err := s.syncStateProvider.RemFailedMessageID(event.ID); err != nil {
+ s.log.WithError(err).Error("Failed to remove failed message ID from vault")
+ }
+
+ update = imap.NewMessageUpdated(
+ res.update.Message,
+ res.update.Literal,
+ res.update.MailboxIDs,
+ res.update.ParsedMessage,
+ true, // Is the message doesn't exist, silently create it.
+ )
+
+ didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update)
+ if err != nil {
+ return err
+ }
+
+ if !didPublish {
+ update = nil
+ }
+
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+
+ if update == nil {
+ return nil, nil
+ }
+
+ return []imap.Update{update}, nil
+}
+
+func onMessageUpdate(ctx context.Context, s *Service, message proton.MessageMetadata) ([]imap.Update, error) {
+ s.log.WithFields(logrus.Fields{
+ "messageID": message.ID,
+ "subject": logging.Sensitive(message.Subject),
+ }).Info("Handling message updated event")
+
+ flags := BuildFlagSetFromMessageMetadata(message)
+
+ update := imap.NewMessageMailboxesUpdated(
+ imap.MessageID(message.ID),
+ usertypes.MapTo[string, imap.MailboxID](wantLabels(s.labels.GetLabelMap(), message.LabelIDs)),
+ flags,
+ )
+
+ didPublish, err := safePublishMessageUpdate(ctx, s, message.AddressID, update)
+ if err != nil {
+ return nil, err
+ }
+
+ if !didPublish {
+ return nil, nil
+ }
+
+ return []imap.Update{update}, nil
+}
+
+func onMessageDeleted(ctx context.Context, s *Service, event proton.MessageEvent) []imap.Update {
+ s.log.WithField("messageID", event.ID).Info("Handling message deleted event")
+
+ updates := make([]imap.Update, 0, len(s.connectors))
+
+ for _, updateCh := range maps.Values(s.connectors) {
+ update := imap.NewMessagesDeleted(imap.MessageID(event.ID))
+ updateCh.publishUpdate(ctx, update)
+ updates = append(updates, update)
+ }
+
+ return updates
+}
+
+func reportError(r reporter.Reporter, entry *logrus.Entry, title string, err error) {
+ reportErrorNoContextCancel(r, entry, title, err, reporter.Context{})
+}
+
+func reportErrorAndMessageID(r reporter.Reporter, entry *logrus.Entry, title string, err error, messgeID string) {
+ reportErrorNoContextCancel(r, entry, title, err, reporter.Context{"messageID": messgeID})
+}
+
+func reportErrorNoContextCancel(r reporter.Reporter, entry *logrus.Entry, title string, err error, reportContext reporter.Context) {
+ if !errors.Is(err, context.Canceled) {
+ reportContext["error"] = err
+ reportContext["error_type"] = internal.ErrCauseType(err)
+ if rerr := r.ReportMessageWithContext(title, reportContext); rerr != nil {
+ entry.WithError(err).WithField("title", title).Error("Failed to report message")
+ }
+ }
+}
+
+// safePublishMessageUpdate handles the rare case where the address' update channel may have been deleted in the same
+// event. This rare case can take place if in the same event fetch request there is an update for delete address and
+// create/update message.
+// If the user is in combined mode, we simply push the update to the primary address. If the user is in split mode
+// we do not publish the update as the address no longer exists.
+func safePublishMessageUpdate(ctx context.Context, s *Service, addressID string, update imap.Update) (bool, error) {
+ v, ok := s.connectors[addressID]
+ if !ok {
+ if s.addressMode == usertypes.AddressModeCombined {
+ primAddr, err := s.identityState.GetPrimaryAddress()
+ if err != nil {
+ return false, fmt.Errorf("failed to get primary address: %w", err)
+ }
+ primaryCh, ok := s.connectors[primAddr.ID]
+ if !ok {
+ return false, fmt.Errorf("primary address channel is not available")
+ }
+
+ primaryCh.publishUpdate(ctx, update)
+
+ return true, nil
+ }
+
+ logrus.Warnf("Update channel not found for address %v, it may have been already deleted", addressID)
+ _ = s.reporter.ReportMessage("Message Update channel does not exist")
+
+ return false, nil
+ }
+
+ v.publishUpdate(ctx, update)
+
+ return true, nil
+}
diff --git a/internal/services/imapservice/service_sync.go b/internal/services/imapservice/service_sync.go
new file mode 100644
index 00000000..8c21952b
--- /dev/null
+++ b/internal/services/imapservice/service_sync.go
@@ -0,0 +1,126 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/ProtonMail/gluon/async"
+ "github.com/ProtonMail/gluon/imap"
+)
+
+type syncUpdate struct {
+ addrID string
+ update imap.Update
+}
+
+type syncUpdater struct {
+ ch chan syncUpdate
+}
+
+type syncUpdatePublisher struct {
+ addrID string
+ updater *syncUpdater
+}
+
+func (s *syncUpdatePublisher) publishUpdate(ctx context.Context, update imap.Update) {
+ select {
+ case <-ctx.Done():
+ update.Done(fmt.Errorf("not applied: %w", ctx.Err()))
+ return
+
+ case s.updater.ch <- syncUpdate{addrID: s.addrID, update: update}:
+ }
+}
+
+func newSyncUpdater() *syncUpdater {
+ return &syncUpdater{ch: make(chan syncUpdate)}
+}
+
+func (s *syncUpdater) createPublisher(addrID string) *syncUpdatePublisher {
+ return &syncUpdatePublisher{updater: s, addrID: addrID}
+}
+
+func (s *syncUpdater) Close() {
+ close(s.ch)
+}
+
+type syncHandler struct {
+ group *async.Group
+ updater *syncUpdater
+ syncFinishedCh chan error
+}
+
+func newSyncHandler(ctx context.Context, handler async.PanicHandler) *syncHandler {
+ return &syncHandler{
+ group: async.NewGroup(ctx, handler),
+ updater: newSyncUpdater(),
+ syncFinishedCh: make(chan error, 2),
+ }
+}
+
+func (s *syncHandler) Close() {
+ s.group.CancelAndWait()
+ close(s.syncFinishedCh)
+}
+
+func (s *syncHandler) CancelAndWait() {
+ s.group.CancelAndWait()
+}
+
+func (s *syncHandler) Cancel() {
+ s.group.Cancel()
+}
+
+func (s *syncHandler) OnSyncFinishedCH() <-chan error {
+ return s.syncFinishedCh
+}
+
+func (s *syncHandler) launch(service *Service) {
+ service.eventProvider.Pause()
+
+ labels := service.labels.GetLabelMap()
+
+ updaters := make(map[string]updatePublisher, len(service.connectors))
+
+ for _, c := range service.connectors {
+ updaters[c.addrID] = s.updater.createPublisher(c.addrID)
+ }
+
+ state := &syncJob{
+ client: service.client,
+ userID: service.identityState.UserID(),
+ labels: labels,
+ updaters: updaters,
+ addressMode: service.addressMode,
+ syncState: service.syncStateProvider,
+ eventPublisher: service.eventPublisher,
+ log: service.log,
+ // We make a copy of the identity state to avoid holding on to locks for a very long time.
+ identityState: service.identityState.Clone(),
+ panicHandler: service.panicHandler,
+ reporter: service.reporter,
+ maxSyncMemory: service.maxSyncMemory,
+ }
+
+ s.group.Once(func(ctx context.Context) {
+ err := state.run(ctx)
+ s.syncFinishedCh <- err
+ })
+}
diff --git a/internal/services/imapservice/shared_identity.go b/internal/services/imapservice/shared_identity.go
new file mode 100644
index 00000000..1d02a2c7
--- /dev/null
+++ b/internal/services/imapservice/shared_identity.go
@@ -0,0 +1,110 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "sync"
+
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/gopenpgp/v2/crypto"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity"
+ "golang.org/x/exp/slices"
+)
+
+type sharedIdentity interface {
+ UserID() string
+ GetAddress(id string) (proton.Address, bool)
+ GetPrimaryAddress() (proton.Address, error)
+ GetAddresses() []proton.Address
+ WithAddrKR(addrID string, fn func(userKR, addrKR *crypto.KeyRing) error) error
+ CheckAuth(email string, password []byte, telemetry Telemetry) (string, error)
+}
+
+type rwIdentity struct {
+ lock sync.RWMutex
+ identity *useridentity.State
+ bridgePassProvider useridentity.BridgePassProvider
+ keyPassProvider useridentity.KeyPassProvider
+}
+
+func (r *rwIdentity) GetPrimaryAddress() (proton.Address, error) {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+
+ return r.identity.GetPrimaryAddr()
+}
+
+func (r *rwIdentity) GetAddresses() []proton.Address {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+
+ return slices.Clone(r.identity.AddressesSorted)
+}
+
+func (r *rwIdentity) Clone() *useridentity.State {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+
+ return r.identity.Clone()
+}
+
+func newRWIdentity(identity *useridentity.State,
+ bridgePassProvider useridentity.BridgePassProvider,
+ keyPassProvider useridentity.KeyPassProvider,
+) *rwIdentity {
+ return &rwIdentity{
+ identity: identity,
+ bridgePassProvider: bridgePassProvider,
+ keyPassProvider: keyPassProvider,
+ }
+}
+
+func (r *rwIdentity) UserID() string {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+
+ return r.identity.User.ID
+}
+
+func (r *rwIdentity) GetAddress(id string) (proton.Address, bool) {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+
+ return r.identity.GetAddrByID(id)
+}
+
+func (r *rwIdentity) WithAddrKR(addrID string, fn func(userKR *crypto.KeyRing, addrKR *crypto.KeyRing) error) error {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+
+ return r.identity.WithAddrKR(addrID, r.keyPassProvider.KeyPass(), fn)
+}
+
+func (r *rwIdentity) CheckAuth(email string, password []byte, telemetry Telemetry) (string, error) {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+
+ return r.identity.CheckAuth(email, password, r.bridgePassProvider, telemetry)
+}
+
+func (r *rwIdentity) Write(f func(identity *useridentity.State) error) error {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+
+ return f(r.identity)
+}
diff --git a/internal/services/imapservice/shared_labels.go b/internal/services/imapservice/shared_labels.go
new file mode 100644
index 00000000..99982a8a
--- /dev/null
+++ b/internal/services/imapservice/shared_labels.go
@@ -0,0 +1,132 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "sync"
+
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
+ "golang.org/x/exp/maps"
+)
+
+type labelMap = map[string]proton.Label
+
+// sharedLabels holds the shared state of all the available API labels which can safely be shared among
+// all IMAP states. It's written this way to prevent issues with invalid use of the locks.
+type sharedLabels interface {
+ Read() labelsRead
+ Write() labelsWrite
+}
+
+type labelsRead interface {
+ Close()
+ GetLabel(id string) (proton.Label, bool)
+ GetLabels() []proton.Label
+}
+
+type labelsWrite interface {
+ labelsRead
+ SetLabel(id string, label proton.Label)
+ Delete(id string)
+}
+
+type rwLabels struct {
+ lock sync.RWMutex
+ labels labelMap
+}
+
+func (r *rwLabels) Read() labelsRead {
+ r.lock.RLock()
+ return &rwLabelsRead{rw: r}
+}
+
+func (r *rwLabels) Write() labelsWrite {
+ r.lock.Lock()
+ return &rwLabelsWrite{rw: r}
+}
+
+func (r *rwLabels) getLabelUnsafe(id string) (proton.Label, bool) {
+ v, ok := r.labels[id]
+
+ return v, ok
+}
+
+func (r *rwLabels) getLabelsUnsafe() []proton.Label {
+ return maps.Values(r.labels)
+}
+
+func (r *rwLabels) SetLabels(labels []proton.Label) {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+
+ r.labels = usertypes.GroupBy(labels, func(label proton.Label) string { return label.ID })
+}
+
+func (r *rwLabels) GetLabelMap() labelMap {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+
+ return maps.Clone(r.labels)
+}
+
+func newRWLabels() *rwLabels {
+ return &rwLabels{
+ labels: make(labelMap),
+ }
+}
+
+type rwLabelsRead struct {
+ rw *rwLabels
+}
+
+func (r rwLabelsRead) Close() {
+ r.rw.lock.RUnlock()
+}
+
+func (r rwLabelsRead) GetLabel(id string) (proton.Label, bool) {
+ return r.rw.getLabelUnsafe(id)
+}
+
+func (r rwLabelsRead) GetLabels() []proton.Label {
+ return r.rw.getLabelsUnsafe()
+}
+
+type rwLabelsWrite struct {
+ rw *rwLabels
+}
+
+func (r rwLabelsWrite) Close() {
+ r.rw.lock.Unlock()
+}
+
+func (r rwLabelsWrite) GetLabel(id string) (proton.Label, bool) {
+ return r.rw.getLabelUnsafe(id)
+}
+
+func (r rwLabelsWrite) GetLabels() []proton.Label {
+ return r.rw.getLabelsUnsafe()
+}
+
+func (r rwLabelsWrite) SetLabel(id string, label proton.Label) {
+ r.rw.labels[id] = label
+}
+
+func (r rwLabelsWrite) Delete(id string) {
+ delete(r.rw.labels, id)
+}
diff --git a/internal/services/imapservice/sync.go b/internal/services/imapservice/sync.go
new file mode 100644
index 00000000..b45a1e49
--- /dev/null
+++ b/internal/services/imapservice/sync.go
@@ -0,0 +1,202 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/ProtonMail/gluon/async"
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/gluon/reporter"
+ "github.com/ProtonMail/proton-bridge/v3/internal/events"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity"
+ "github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
+ "github.com/bradenaw/juniper/xslices"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/exp/maps"
+ "golang.org/x/exp/slices"
+)
+
+type updatePublisher interface {
+ publishUpdate(ctx context.Context, update imap.Update)
+}
+
+type syncJob struct {
+ client APIClient
+ userID string
+ labels labelMap
+ updaters map[string]updatePublisher
+ addressMode usertypes.AddressMode
+ syncState SyncStateProvider
+ eventPublisher events.EventPublisher
+ log *logrus.Entry
+ identityState *useridentity.State
+ panicHandler async.PanicHandler
+ reporter reporter.Reporter
+ maxSyncMemory uint64
+}
+
+const SyncRetryCoolDown = 20 * time.Second
+
+func (s *syncJob) run(ctx context.Context) error {
+ s.log.Info("Sync triggered")
+ s.eventPublisher.PublishEvent(ctx, events.SyncStarted{UserID: s.userID})
+
+ if s.syncState.GetSyncStatus().IsComplete() {
+ s.log.Info("Sync already complete, only system labels will be updated")
+
+ if err := s.syncSystemLabels(ctx); err != nil {
+ s.log.WithError(err).Error("Failed to sync system labels")
+ s.eventPublisher.PublishEvent(ctx, events.SyncFailed{
+ UserID: s.userID,
+ Error: err,
+ })
+ return err
+ }
+ s.eventPublisher.PublishEvent(ctx, events.SyncFinished{UserID: s.userID})
+ return nil
+ }
+
+ for {
+ if err := ctx.Err(); err != nil {
+ s.log.WithError(err).Error("Sync aborted")
+ return fmt.Errorf("sync aborted: %w", ctx.Err())
+ } else if err := s.doSync(ctx); err != nil {
+ s.log.WithError(err).Error("Failed to sync, will retry later")
+ sleepCtx(ctx, SyncRetryCoolDown)
+ } else {
+ break
+ }
+ }
+
+ return nil
+}
+
+func (s *syncJob) syncSystemLabels(ctx context.Context) error {
+ var updates []imap.Update
+
+ for _, label := range s.labels {
+ if !WantLabel(label) {
+ continue
+ }
+
+ for _, connector := range s.updaters {
+ update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name)
+ connector.publishUpdate(ctx, update)
+ updates = append(updates, update)
+ }
+ }
+
+ if err := waitOnIMAPUpdates(ctx, updates); err != nil {
+ return fmt.Errorf("could not sync system labels: %w", err)
+ }
+
+ return nil
+}
+
+func (s *syncJob) doSync(ctx context.Context) error {
+ start := time.Now()
+
+ s.log.WithField("start", start).Info("Beginning user sync")
+
+ s.eventPublisher.PublishEvent(ctx, events.SyncStarted{
+ UserID: s.userID,
+ })
+
+ if err := s.sync(ctx); err != nil {
+ s.log.WithError(err).Warn("Failed to sync user")
+
+ s.eventPublisher.PublishEvent(ctx, events.SyncFailed{
+ UserID: s.userID,
+ Error: err,
+ })
+
+ return fmt.Errorf("failed to sync: %w", err)
+ }
+
+ s.log.WithField("duration", time.Since(start)).Info("Finished user sync")
+
+ s.eventPublisher.PublishEvent(ctx, events.SyncFinished{
+ UserID: s.userID,
+ })
+
+ return nil
+}
+
+func (s *syncJob) sync(ctx context.Context) error {
+ syncStatus := s.syncState.GetSyncStatus()
+
+ if !syncStatus.HasLabels {
+ s.log.Info("Syncing labels")
+
+ if err := syncLabels(ctx, s.labels, maps.Values(s.updaters)...); err != nil {
+ return fmt.Errorf("failed to sync labels: %w", err)
+ }
+
+ if err := s.syncState.SetHasLabels(true); err != nil {
+ return fmt.Errorf("failed to set has labels: %w", err)
+ }
+
+ s.log.Info("Synced labels")
+ }
+
+ if !syncStatus.HasMessages {
+ s.log.Info("Syncing messages")
+
+ // Determine which messages to sync.
+ messageIDs, err := s.client.GetMessageIDs(ctx, "")
+ if err != nil {
+ return fmt.Errorf("failed to get message IDs to sync: %w", err)
+ }
+
+ s.log.Debugf("User has the following failed synced message ids: %v", syncStatus.FailedMessageIDs)
+
+ // Remove any messages that have already failed to sync.
+ messageIDs = xslices.Filter(messageIDs, func(messageID string) bool {
+ return !slices.Contains(syncStatus.FailedMessageIDs, messageID)
+ })
+
+ // Reverse the order of the message IDs so that the newest messages are synced first.
+ xslices.Reverse(messageIDs)
+
+ // If we have a message ID that we've already synced, then we can skip all messages before it.
+ if idx := xslices.Index(messageIDs, syncStatus.LastMessageID); idx >= 0 {
+ messageIDs = messageIDs[idx+1:]
+ }
+
+ // Sync the messages.
+ if err := s.syncMessages(
+ ctx,
+ messageIDs,
+ ); err != nil {
+ return fmt.Errorf("failed to sync messages: %w", err)
+ }
+
+ if err := s.syncState.SetHasMessages(true); err != nil {
+ return fmt.Errorf("failed to set has messages: %w", err)
+ }
+
+ s.log.Info("Synced messages")
+ } else {
+ s.log.Info("Messages are already synced, skipping")
+ }
+
+ return nil
+}
diff --git a/internal/services/imapservice/sync_attachment_downloader.go b/internal/services/imapservice/sync_attachment_downloader.go
new file mode 100644
index 00000000..5acf6631
--- /dev/null
+++ b/internal/services/imapservice/sync_attachment_downloader.go
@@ -0,0 +1,115 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+
+ "github.com/ProtonMail/gluon/async"
+ "github.com/ProtonMail/gluon/logging"
+ "github.com/ProtonMail/go-proton-api"
+)
+
+type attachmentResult struct {
+ attachment []byte
+ err error
+}
+
+type attachmentJob struct {
+ id string
+ size int64
+ result chan attachmentResult
+}
+
+type attachmentDownloader struct {
+ workerCh chan attachmentJob
+ cancel context.CancelFunc
+}
+
+func attachmentWorker(ctx context.Context, client APIClient, work <-chan attachmentJob) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case job, ok := <-work:
+ if !ok {
+ return
+ }
+ var b bytes.Buffer
+ b.Grow(int(job.size))
+ err := client.GetAttachmentInto(ctx, job.id, &b)
+ select {
+ case <-ctx.Done():
+ close(job.result)
+ return
+ case job.result <- attachmentResult{attachment: b.Bytes(), err: err}:
+ close(job.result)
+ }
+ }
+ }
+}
+
+func (s *syncJob) newAttachmentDownloader(ctx context.Context, client APIClient, workerCount int) *attachmentDownloader {
+ workerCh := make(chan attachmentJob, (workerCount+2)*workerCount)
+ ctx, cancel := context.WithCancel(ctx)
+ for i := 0; i < workerCount; i++ {
+ workerCh = make(chan attachmentJob)
+ async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { attachmentWorker(ctx, client, workerCh) }, logging.Labels{
+ "sync": fmt.Sprintf("att-downloader %v", i),
+ })
+ }
+
+ return &attachmentDownloader{
+ workerCh: workerCh,
+ cancel: cancel,
+ }
+}
+
+func (a *attachmentDownloader) getAttachments(ctx context.Context, attachments []proton.Attachment) ([][]byte, error) {
+ resultChs := make([]chan attachmentResult, len(attachments))
+ for i, id := range attachments {
+ resultChs[i] = make(chan attachmentResult, 1)
+ select {
+ case a.workerCh <- attachmentJob{id: id.ID, result: resultChs[i], size: id.Size}:
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ }
+
+ result := make([][]byte, len(attachments))
+ var err error
+ for i := 0; i < len(attachments); i++ {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case r := <-resultChs[i]:
+ if r.err != nil {
+ err = fmt.Errorf("failed to get attachment %v: %w", attachments[i], r.err)
+ }
+ result[i] = r.attachment
+ }
+ }
+
+ return result, err
+}
+
+func (a *attachmentDownloader) close() {
+ a.cancel()
+}
diff --git a/internal/services/imapservice/sync_build.go b/internal/services/imapservice/sync_build.go
new file mode 100644
index 00000000..1af8e407
--- /dev/null
+++ b/internal/services/imapservice/sync_build.go
@@ -0,0 +1,174 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "bytes"
+ "html/template"
+ "time"
+
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/gopenpgp/v2/crypto"
+ "github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
+ "github.com/ProtonMail/proton-bridge/v3/pkg/algo"
+ "github.com/ProtonMail/proton-bridge/v3/pkg/message"
+ "github.com/bradenaw/juniper/xslices"
+)
+
+type buildRes struct {
+ messageID string
+ addressID string
+ update *imap.MessageCreated
+ err error
+}
+
+func defaultMessageJobOpts() message.JobOptions {
+ return message.JobOptions{
+ IgnoreDecryptionErrors: true, // Whether to ignore decryption errors and create a "custom message" instead.
+ SanitizeDate: true, // Whether to replace all dates before 1970 with RFC822's birthdate.
+ AddInternalID: true, // Whether to include MessageID as X-Pm-Internal-Id.
+ AddExternalID: true, // Whether to include ExternalID as X-Pm-External-Id.
+ AddMessageDate: true, // Whether to include message time as X-Pm-Date.
+ AddMessageIDReference: true, // Whether to include the MessageID in References.
+ }
+}
+
+func buildRFC822(apiLabels map[string]proton.Label, full proton.FullMessage, addrKR *crypto.KeyRing, buffer *bytes.Buffer) *buildRes {
+ var (
+ update *imap.MessageCreated
+ err error
+ )
+
+ buffer.Grow(full.Size)
+
+ if buildErr := message.BuildRFC822Into(addrKR, full.Message, full.AttData, defaultMessageJobOpts(), buffer); buildErr != nil {
+ update = newMessageCreatedFailedUpdate(apiLabels, full.MessageMetadata, buildErr)
+ err = buildErr
+ } else if created, parseErr := newMessageCreatedUpdate(apiLabels, full.MessageMetadata, buffer.Bytes()); parseErr != nil {
+ update = newMessageCreatedFailedUpdate(apiLabels, full.MessageMetadata, parseErr)
+ err = parseErr
+ } else {
+ update = created
+ }
+
+ return &buildRes{
+ messageID: full.ID,
+ addressID: full.AddressID,
+ update: update,
+ err: err,
+ }
+}
+
+func newMessageCreatedUpdate(
+ apiLabels map[string]proton.Label,
+ message proton.MessageMetadata,
+ literal []byte,
+) (*imap.MessageCreated, error) {
+ parsedMessage, err := imap.NewParsedMessage(literal)
+ if err != nil {
+ return nil, err
+ }
+
+ return &imap.MessageCreated{
+ Message: toIMAPMessage(message),
+ Literal: literal,
+ MailboxIDs: usertypes.MapTo[string, imap.MailboxID](wantLabels(apiLabels, message.LabelIDs)),
+ ParsedMessage: parsedMessage,
+ }, nil
+}
+
+func newMessageCreatedFailedUpdate(
+ apiLabels map[string]proton.Label,
+ message proton.MessageMetadata,
+ err error,
+) *imap.MessageCreated {
+ literal := newFailedMessageLiteral(message.ID, time.Unix(message.Time, 0), message.Subject, err)
+
+ parsedMessage, err := imap.NewParsedMessage(literal)
+ if err != nil {
+ panic(err)
+ }
+
+ return &imap.MessageCreated{
+ Message: toIMAPMessage(message),
+ MailboxIDs: usertypes.MapTo[string, imap.MailboxID](wantLabels(apiLabels, message.LabelIDs)),
+ Literal: literal,
+ ParsedMessage: parsedMessage,
+ }
+}
+
+func newFailedMessageLiteral(
+ messageID string,
+ date time.Time,
+ subject string,
+ syncErr error,
+) []byte {
+ var buf bytes.Buffer
+
+ if tmpl, err := template.New("header").Parse(failedMessageHeaderTemplate); err != nil {
+ panic(err)
+ } else if b, err := tmplExec(tmpl, map[string]any{
+ "Date": date.In(time.UTC).Format(time.RFC822),
+ }); err != nil {
+ panic(err)
+ } else if _, err := buf.Write(b); err != nil {
+ panic(err)
+ }
+
+ if tmpl, err := template.New("body").Parse(failedMessageBodyTemplate); err != nil {
+ panic(err)
+ } else if b, err := tmplExec(tmpl, map[string]any{
+ "MessageID": messageID,
+ "Subject": subject,
+ "Error": syncErr.Error(),
+ }); err != nil {
+ panic(err)
+ } else if _, err := buf.Write(lineWrap(algo.B64Encode(b))); err != nil {
+ panic(err)
+ }
+
+ return buf.Bytes()
+}
+
+func tmplExec(template *template.Template, data any) ([]byte, error) {
+ var buf bytes.Buffer
+
+ if err := template.Execute(&buf, data); err != nil {
+ return nil, err
+ }
+
+ return buf.Bytes(), nil
+}
+
+func lineWrap(b []byte) []byte {
+ return bytes.Join(xslices.Chunk(b, 76), []byte{'\r', '\n'})
+}
+
+const failedMessageHeaderTemplate = `Date: {{.Date}}
+Subject: Message failed to build
+Content-Type: text/plain
+Content-Transfer-Encoding: base64
+
+`
+
+const failedMessageBodyTemplate = `Failed to build message:
+Subject: {{.Subject}}
+Error: {{.Error}}
+MessageID: {{.MessageID}}
+`
diff --git a/internal/services/imapservice/sync_build_test.go b/internal/services/imapservice/sync_build_test.go
new file mode 100644
index 00000000..a47e11e8
--- /dev/null
+++ b/internal/services/imapservice/sync_build_test.go
@@ -0,0 +1,80 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/gluon/rfc822"
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/bradenaw/juniper/xslices"
+ "github.com/stretchr/testify/require"
+)
+
+func TestNewFailedMessageLiteral(t *testing.T) {
+ literal := newFailedMessageLiteral("abcd-efgh", time.Unix(123456789, 0), "subject", errors.New("oops"))
+
+ header, err := rfc822.Parse(literal).ParseHeader()
+ require.NoError(t, err)
+ require.Equal(t, "Message failed to build", header.Get("Subject"))
+ require.Equal(t, "29 Nov 73 21:33 UTC", header.Get("Date"))
+ require.Equal(t, "text/plain", header.Get("Content-Type"))
+ require.Equal(t, "base64", header.Get("Content-Transfer-Encoding"))
+
+ b, err := rfc822.Parse(literal).DecodedBody()
+ require.NoError(t, err)
+ require.Equal(t, string(b), "Failed to build message: \nSubject: subject\nError: oops\nMessageID: abcd-efgh\n")
+
+ parsed, err := imap.NewParsedMessage(literal)
+ require.NoError(t, err)
+ require.Equal(t, `("29 Nov 73 21:33 UTC" "Message failed to build" NIL NIL NIL NIL NIL NIL NIL NIL)`, parsed.Envelope)
+ require.Equal(t, `("text" "plain" () NIL NIL "base64" 114 2)`, parsed.Body)
+ require.Equal(t, `("text" "plain" () NIL NIL "base64" 114 2 NIL NIL NIL NIL)`, parsed.Structure)
+}
+
+func TestSyncChunkSyncBuilderBatch(t *testing.T) {
+ // GODT-2424 - Some messages were not fully built due to a bug in the chunking if the total memory used by the
+ // message would be higher than the maximum we allowed.
+ const totalMessageCount = 100
+
+ msg := proton.FullMessage{
+ Message: proton.Message{
+ Attachments: []proton.Attachment{
+ {
+ Size: int64(8 * Megabyte),
+ },
+ },
+ },
+ AttData: nil,
+ }
+
+ messages := xslices.Repeat(msg, totalMessageCount)
+
+ chunks := chunkSyncBuilderBatch(messages, 16*Megabyte)
+
+ var totalMessagesInChunks int
+
+ for _, v := range chunks {
+ totalMessagesInChunks += len(v)
+ }
+
+ require.Equal(t, totalMessagesInChunks, totalMessageCount)
+}
diff --git a/internal/services/imapservice/sync_labels.go b/internal/services/imapservice/sync_labels.go
new file mode 100644
index 00000000..d6a9468d
--- /dev/null
+++ b/internal/services/imapservice/sync_labels.go
@@ -0,0 +1,76 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/go-proton-api"
+)
+
+// nolint:exhaustive
+func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updatePublishers ...updatePublisher) error {
+ var updates []imap.Update
+
+ // Create placeholder Folders/Labels mailboxes with the \Noselect attribute.
+ for _, prefix := range []string{folderPrefix, labelPrefix} {
+ for _, updateCh := range updatePublishers {
+ update := newPlaceHolderMailboxCreatedUpdate(prefix)
+ updateCh.publishUpdate(ctx, update)
+ updates = append(updates, update)
+ }
+ }
+
+ // Sync the user's labels.
+ for labelID, label := range apiLabels {
+ if !WantLabel(label) {
+ continue
+ }
+
+ switch label.Type {
+ case proton.LabelTypeSystem:
+ for _, updateCh := range updatePublishers {
+ update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name)
+ updateCh.publishUpdate(ctx, update)
+ updates = append(updates, update)
+ }
+
+ case proton.LabelTypeFolder, proton.LabelTypeLabel:
+ for _, updateCh := range updatePublishers {
+ update := newMailboxCreatedUpdate(imap.MailboxID(labelID), GetMailboxName(label))
+ updateCh.publishUpdate(ctx, update)
+ updates = append(updates, update)
+ }
+
+ default:
+ return fmt.Errorf("unknown label type: %d", label.Type)
+ }
+ }
+
+ // Wait for all label updates to be applied.
+ for _, update := range updates {
+ err, ok := update.WaitContext(ctx)
+ if ok && err != nil {
+ return fmt.Errorf("failed to apply label create update in gluon %v: %w", update.String(), err)
+ }
+ }
+
+ return nil
+}
diff --git a/internal/services/imapservice/sync_messages.go b/internal/services/imapservice/sync_messages.go
new file mode 100644
index 00000000..40af32d3
--- /dev/null
+++ b/internal/services/imapservice/sync_messages.go
@@ -0,0 +1,523 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "os"
+ "runtime"
+ "time"
+
+ "github.com/ProtonMail/gluon/async"
+ "github.com/ProtonMail/gluon/imap"
+ "github.com/ProtonMail/gluon/logging"
+ "github.com/ProtonMail/gluon/reporter"
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/gopenpgp/v2/crypto"
+ "github.com/bradenaw/juniper/parallel"
+ "github.com/bradenaw/juniper/xslices"
+ "github.com/pbnjay/memory"
+ "github.com/sirupsen/logrus"
+)
+
+func (s *syncJob) syncMessages(ctx context.Context, messageIDs []string) error {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ // Track the amount of time to process all the messages.
+ syncStartTime := time.Now()
+ defer func() { logrus.WithField("duration", time.Since(syncStartTime)).Info("Message sync completed") }()
+
+ s.log.WithFields(logrus.Fields{
+ "messages": len(messageIDs),
+ "numCPU": runtime.NumCPU(),
+ }).Info("Starting message sync")
+
+ // Create the flushers, one per update channel.
+
+ // Create a reporter to report sync progress updates.
+ syncReporter := newSyncReporter(s.userID, s.eventPublisher, len(messageIDs), time.Second)
+ defer syncReporter.done(ctx)
+
+ // Expected mem usage for this whole process should be the sum of MaxMessageBuildingMem and MaxDownloadRequestMem
+ // times x due to pipeline and all additional memory used by network requests and compression+io.
+
+ totalMemory := memory.TotalMemory()
+
+ syncLimits := newSyncLimits(s.maxSyncMemory)
+
+ if syncLimits.MaxSyncMemory >= totalMemory/2 {
+ logrus.Warnf("Requested max sync memory of %v MB is greater than half of system memory (%v MB), forcing to half of system memory",
+ toMB(syncLimits.MaxSyncMemory), toMB(totalMemory/2))
+ syncLimits.MaxSyncMemory = totalMemory / 2
+ }
+
+ if syncLimits.MaxSyncMemory < 800*Megabyte {
+ logrus.Warnf("Requested max sync memory of %v MB, but minimum recommended is 800 MB, forcing max syncMemory to 800MB", toMB(syncLimits.MaxSyncMemory))
+ syncLimits.MaxSyncMemory = 800 * Megabyte
+ }
+
+ logrus.Debugf("Total System Memory: %v", toMB(totalMemory))
+
+ // Linter says it's not used. This is a lie.
+ var syncMaxDownloadRequestMem uint64
+
+ // Linter says it's not used. This is a lie.
+ var syncMaxMessageBuildingMem uint64
+
+ // If less than 2GB available try and limit max memory to 512 MB
+ switch {
+ case syncLimits.MaxSyncMemory < 2*Gigabyte:
+ if syncLimits.MaxSyncMemory < 800*Megabyte {
+ logrus.Warnf("System has less than 800MB of memory, you may experience issues sycing large mailboxes")
+ }
+ syncMaxDownloadRequestMem = syncLimits.MinDownloadRequestMem
+ syncMaxMessageBuildingMem = syncLimits.MinMessageBuildingMem
+ case syncLimits.MaxSyncMemory == 2*Gigabyte:
+ // Increasing the max download capacity has very little effect on sync speed. We could increase the download
+ // memory but the user would see less sync notifications. A smaller value here leads to more frequent
+ // updates. Additionally, most of sync time is spent in the message building.
+ syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem
+ // Currently limited so that if a user has multiple accounts active it also doesn't cause excessive memory usage.
+ syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem
+ default:
+ // Divide by 8 as download stage and build stage will use aprox. 4x the specified memory.
+ remainingMemory := (syncLimits.MaxSyncMemory - 2*Gigabyte) / 8
+ syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem + remainingMemory
+ syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem + remainingMemory
+ }
+
+ logrus.Debugf("Max memory usage for sync Download=%vMB Building=%vMB Predicted Max Total=%vMB",
+ toMB(syncMaxDownloadRequestMem),
+ toMB(syncMaxMessageBuildingMem),
+ toMB((syncMaxMessageBuildingMem*4)+(syncMaxDownloadRequestMem*4)),
+ )
+
+ downloadCh := startMetadataDownloader(ctx, s, messageIDs, syncMaxDownloadRequestMem)
+ buildCh, errorCh := startMessageDownloader(ctx, s, syncLimits, downloadCh)
+ flushCh := startMessageBuilder(ctx, s, buildCh, syncMaxMessageBuildingMem)
+ flushUpdateCh := startMessageFlusher(ctx, s, flushCh)
+
+ for flushUpdate := range flushUpdateCh {
+ if flushUpdate.err != nil {
+ return flushUpdate.err
+ }
+
+ if err := s.syncState.SetLastMessageID(flushUpdate.messageID); err != nil {
+ return fmt.Errorf("failed to set last synced message ID: %w", err)
+ }
+
+ syncReporter.add(ctx, flushUpdate.batchLen)
+ }
+
+ return <-errorCh
+}
+
+const Kilobyte = uint64(1024)
+const Megabyte = 1024 * Kilobyte
+const Gigabyte = 1024 * Megabyte
+
+func toMB(v uint64) float64 {
+ return float64(v) / float64(Megabyte)
+}
+
+type syncLimits struct {
+ MaxDownloadRequestMem uint64
+ MinDownloadRequestMem uint64
+ MaxMessageBuildingMem uint64
+ MinMessageBuildingMem uint64
+ MaxSyncMemory uint64
+ MaxParallelDownloads int
+}
+
+func newSyncLimits(maxSyncMemory uint64) syncLimits {
+ limits := syncLimits{
+ // There's no point in using more than 128MB of download data per stage, after that we reach a point of diminishing
+ // returns as we can't keep the pipeline fed fast enough.
+ MaxDownloadRequestMem: 128 * Megabyte,
+
+ // Any lower than this and we may fail to download messages.
+ MinDownloadRequestMem: 40 * Megabyte,
+
+ // This value can be increased to your hearts content. The more system memory the user has, the more messages
+ // we can build in parallel.
+ MaxMessageBuildingMem: 128 * Megabyte,
+ MinMessageBuildingMem: 64 * Megabyte,
+
+ // Maximum recommend value for parallel downloads by the API team.
+ MaxParallelDownloads: 20,
+
+ MaxSyncMemory: maxSyncMemory,
+ }
+
+ if _, ok := os.LookupEnv("BRIDGE_SYNC_FORCE_MINIMUM_SPEC"); ok {
+ logrus.Warn("Sync specs forced to minimum")
+ limits.MaxDownloadRequestMem = 50 * Megabyte
+ limits.MaxMessageBuildingMem = 80 * Megabyte
+ limits.MaxParallelDownloads = 2
+ limits.MaxSyncMemory = 800 * Megabyte
+ }
+
+ return limits
+}
+
+func chunkSyncBuilderBatch(batch []proton.FullMessage, maxMemory uint64) [][]proton.FullMessage {
+ var expectedMemUsage uint64
+ var chunks [][]proton.FullMessage
+ var lastIndex int
+ var index int
+
+ for _, v := range batch {
+ var dataSize uint64
+ for _, a := range v.Attachments {
+ dataSize += uint64(a.Size)
+ }
+
+ // 2x increase for attachment due to extra memory needed for decrypting and writing
+ // in memory buffer.
+ dataSize *= 2
+ dataSize += uint64(len(v.Body))
+
+ nextMemSize := expectedMemUsage + dataSize
+ if nextMemSize >= maxMemory {
+ chunks = append(chunks, batch[lastIndex:index])
+ lastIndex = index
+ expectedMemUsage = dataSize
+ } else {
+ expectedMemUsage = nextMemSize
+ }
+
+ index++
+ }
+
+ if lastIndex < len(batch) {
+ chunks = append(chunks, batch[lastIndex:])
+ }
+
+ return chunks
+}
+
+type flushUpdate struct {
+ messageID string
+ err error
+ batchLen int
+}
+
+type downloadRequest struct {
+ ids []string
+ expectedSize uint64
+ err error
+}
+
+type downloadedMessageBatch struct {
+ batch []proton.FullMessage
+}
+
+type builtMessageBatch struct {
+ batch []*buildRes
+}
+
+func startMetadataDownloader(ctx context.Context, s *syncJob, messageIDs []string, syncMaxDownloadRequestMem uint64) <-chan downloadRequest {
+ downloadCh := make(chan downloadRequest)
+ // Go routine in charge of downloading message metadata
+ async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) {
+ defer close(downloadCh)
+ const MetadataDataPageSize = 150
+
+ var downloadReq downloadRequest
+ downloadReq.ids = make([]string, 0, MetadataDataPageSize)
+
+ metadataChunks := xslices.Chunk(messageIDs, MetadataDataPageSize)
+ for i, metadataChunk := range metadataChunks {
+ logrus.Debugf("Metadata Request (%v of %v), previous: %v", i, len(metadataChunks), len(downloadReq.ids))
+ metadata, err := s.client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk})
+ if err != nil {
+ logrus.WithError(err).Errorf("Failed to download message metadata for chunk %v", i)
+ downloadReq.err = err
+ select {
+ case downloadCh <- downloadReq:
+ case <-ctx.Done():
+ return
+ }
+ return
+ }
+
+ if ctx.Err() != nil {
+ return
+ }
+
+ // Build look up table so that messages are processed in the same order.
+ metadataMap := make(map[string]int, len(metadata))
+ for i, v := range metadata {
+ metadataMap[v.ID] = i
+ }
+
+ for i, id := range metadataChunk {
+ m := &metadata[metadataMap[id]]
+ nextSize := downloadReq.expectedSize + uint64(m.Size)
+ if nextSize >= syncMaxDownloadRequestMem || len(downloadReq.ids) >= 256 {
+ logrus.Debugf("Download Request Sent at %v of %v", i, len(metadata))
+ select {
+ case downloadCh <- downloadReq:
+
+ case <-ctx.Done():
+ return
+ }
+ downloadReq.expectedSize = 0
+ downloadReq.ids = make([]string, 0, MetadataDataPageSize)
+ nextSize = uint64(m.Size)
+ }
+ downloadReq.ids = append(downloadReq.ids, id)
+ downloadReq.expectedSize = nextSize
+ }
+ }
+
+ if len(downloadReq.ids) != 0 {
+ logrus.Debugf("Sending remaining download request")
+ select {
+ case downloadCh <- downloadReq:
+
+ case <-ctx.Done():
+ return
+ }
+ }
+ }, logging.Labels{"sync-stage": "meta-data"})
+
+ return downloadCh
+}
+
+func startMessageDownloader(ctx context.Context, s *syncJob, syncLimits syncLimits, downloadCh <-chan downloadRequest) (<-chan downloadedMessageBatch, <-chan error) {
+ buildCh := make(chan downloadedMessageBatch)
+ errorCh := make(chan error, syncLimits.MaxParallelDownloads*4)
+
+ // Goroutine in charge of downloading and building messages in maxBatchSize batches.
+ async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) {
+ defer close(buildCh)
+ defer close(errorCh)
+ defer func() {
+ logrus.Debugf("sync downloader exit")
+ }()
+
+ attachmentDownloader := s.newAttachmentDownloader(ctx, s.client, syncLimits.MaxParallelDownloads)
+ defer attachmentDownloader.close()
+
+ for request := range downloadCh {
+ logrus.Debugf("Download request: %v MB:%v", len(request.ids), toMB(request.expectedSize))
+ if request.err != nil {
+ errorCh <- request.err
+ return
+ }
+
+ if ctx.Err() != nil {
+ errorCh <- ctx.Err()
+ return
+ }
+
+ result, err := parallel.MapContext(ctx, syncLimits.MaxParallelDownloads, request.ids, func(ctx context.Context, id string) (proton.FullMessage, error) {
+ defer async.HandlePanic(s.panicHandler)
+
+ var result proton.FullMessage
+
+ msg, err := s.client.GetMessage(ctx, id)
+ if err != nil {
+ logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message")
+ return proton.FullMessage{}, err
+ }
+
+ attachments, err := attachmentDownloader.getAttachments(ctx, msg.Attachments)
+ if err != nil {
+ logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message attachments")
+ return proton.FullMessage{}, err
+ }
+
+ result.Message = msg
+ result.AttData = attachments
+
+ return result, nil
+ })
+ if err != nil {
+ errorCh <- err
+ return
+ }
+
+ select {
+ case buildCh <- downloadedMessageBatch{
+ batch: result,
+ }:
+
+ case <-ctx.Done():
+ return
+ }
+ }
+ }, logging.Labels{"sync-stage": "download"})
+
+ return buildCh, errorCh
+}
+
+func startMessageBuilder(ctx context.Context, s *syncJob, buildCh <-chan downloadedMessageBatch, syncMaxMessageBuildingMem uint64) <-chan builtMessageBatch {
+ flushCh := make(chan builtMessageBatch)
+
+ // Goroutine which builds messages after they have been downloaded
+ async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) {
+ defer close(flushCh)
+ defer func() {
+ logrus.Debugf("sync builder exit")
+ }()
+
+ if err := s.identityState.WithAddrKRs(nil, func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error {
+ maxMessagesInParallel := runtime.NumCPU()
+
+ for buildBatch := range buildCh {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+
+ chunks := chunkSyncBuilderBatch(buildBatch.batch, syncMaxMessageBuildingMem)
+
+ for index, chunk := range chunks {
+ logrus.Debugf("Build request: %v of %v count=%v", index, len(chunks), len(chunk))
+
+ result, err := parallel.MapContext(ctx, maxMessagesInParallel, chunk, func(ctx context.Context, msg proton.FullMessage) (*buildRes, error) {
+ defer async.HandlePanic(s.panicHandler)
+
+ kr, ok := addrKRs[msg.AddressID]
+ if !ok {
+ logrus.Errorf("Address '%v' on message '%v' does not have an unlocked kerying", msg.AddressID, msg.ID)
+ return &buildRes{
+ messageID: msg.ID,
+ addressID: msg.AddressID,
+ err: fmt.Errorf("address does not have an unlocked keyring"),
+ }, nil
+ }
+
+ res := buildRFC822(s.labels, msg, kr, new(bytes.Buffer))
+ if res.err != nil {
+ s.log.WithError(res.err).WithField("msgID", msg.ID).Error("Failed to build message (syn)")
+ }
+
+ return res, nil
+ })
+ if err != nil {
+ return err
+ }
+
+ select {
+ case flushCh <- builtMessageBatch{result}:
+
+ case <-ctx.Done():
+ return nil
+ }
+ }
+ }
+
+ return nil
+ }); err != nil {
+ s.log.WithError(err).Error("Sync message builder exited with error")
+ }
+ }, logging.Labels{"sync-stage": "builder"})
+
+ return flushCh
+}
+
+func startMessageFlusher(ctx context.Context, s *syncJob, messageBatchCH <-chan builtMessageBatch) <-chan flushUpdate {
+ flushUpdateCh := make(chan flushUpdate)
+
+ // Goroutine which converts the messages into updates and builds a waitable structure for progress tracking.
+ async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) {
+ defer close(flushUpdateCh)
+ defer func() {
+ logrus.Debugf("sync flush exit")
+ }()
+
+ type updateTargetInfo struct {
+ queueIndex int
+ ch updatePublisher
+ }
+
+ pendingUpdates := make([][]*imap.MessageCreated, len(s.updaters))
+ addressToIndex := make(map[string]updateTargetInfo)
+
+ {
+ i := 0
+ for addrID, updateCh := range s.updaters {
+ addressToIndex[addrID] = updateTargetInfo{
+ ch: updateCh,
+ queueIndex: i,
+ }
+ i++
+ }
+ }
+
+ for downloadBatch := range messageBatchCH {
+ logrus.Debugf("Flush batch: %v", len(downloadBatch.batch))
+ for _, res := range downloadBatch.batch {
+ if res.err != nil {
+ if err := s.syncState.AddFailedMessageID(res.messageID); err != nil {
+ logrus.WithError(err).Error("Failed to add failed message ID")
+ }
+
+ if err := s.reporter.ReportMessageWithContext("Failed to build message (sync)", reporter.Context{
+ "messageID": res.messageID,
+ "error": res.err,
+ }); err != nil {
+ s.log.WithError(err).Error("Failed to report message build error")
+ }
+
+ // We could sync a placeholder message here, but for now we skip it entirely.
+ continue
+ }
+
+ if err := s.syncState.RemFailedMessageID(res.messageID); err != nil {
+ logrus.WithError(err).Error("Failed to remove failed message ID")
+ }
+
+ targetInfo := addressToIndex[res.addressID]
+ pendingUpdates[targetInfo.queueIndex] = append(pendingUpdates[targetInfo.queueIndex], res.update)
+ }
+
+ for _, info := range addressToIndex {
+ up := imap.NewMessagesCreated(true, pendingUpdates[info.queueIndex]...)
+ info.ch.publishUpdate(ctx, up)
+
+ err, ok := up.WaitContext(ctx)
+ if ok && err != nil {
+ flushUpdateCh <- flushUpdate{
+ err: fmt.Errorf("failed to apply sync update to gluon %v: %w", up.String(), err),
+ }
+ return
+ }
+
+ pendingUpdates[info.queueIndex] = pendingUpdates[info.queueIndex][:0]
+ }
+
+ select {
+ case flushUpdateCh <- flushUpdate{
+ messageID: downloadBatch.batch[0].messageID,
+ err: nil,
+ batchLen: len(downloadBatch.batch),
+ }:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }, logging.Labels{"sync-stage": "flush"})
+
+ return flushUpdateCh
+}
diff --git a/internal/services/imapservice/sync_reporter.go b/internal/services/imapservice/sync_reporter.go
new file mode 100644
index 00000000..7adda6ca
--- /dev/null
+++ b/internal/services/imapservice/sync_reporter.go
@@ -0,0 +1,72 @@
+// Copyright (c) 2023 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package imapservice
+
+import (
+ "context"
+ "time"
+
+ "github.com/ProtonMail/proton-bridge/v3/internal/events"
+)
+
+type syncReporter struct {
+ userID string
+ eventPublisher events.EventPublisher
+
+ start time.Time
+ total int
+ count int
+
+ last time.Time
+ freq time.Duration
+}
+
+func newSyncReporter(userID string, eventsPublisher events.EventPublisher, total int, freq time.Duration) *syncReporter {
+ return &syncReporter{
+ userID: userID,
+ eventPublisher: eventsPublisher,
+
+ start: time.Now(),
+ total: total,
+ freq: freq,
+ }
+}
+
+func (rep *syncReporter) add(ctx context.Context, delta int) {
+ rep.count += delta
+
+ if time.Since(rep.last) > rep.freq {
+ rep.eventPublisher.PublishEvent(ctx, events.SyncProgress{
+ UserID: rep.userID,
+ Progress: float64(rep.count) / float64(rep.total),
+ Elapsed: time.Since(rep.start),
+ Remaining: time.Since(rep.start) * time.Duration(rep.total-(rep.count+1)) / time.Duration(rep.count+1),
+ })
+
+ rep.last = time.Now()
+ }
+}
+
+func (rep *syncReporter) done(ctx context.Context) {
+ rep.eventPublisher.PublishEvent(ctx, events.SyncProgress{
+ UserID: rep.userID,
+ Progress: 1,
+ Elapsed: time.Since(rep.start),
+ Remaining: 0,
+ })
+}