From a187747c7c01c2a62d911a26e7f5cc55711b0e3d Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Fri, 28 Jul 2023 14:57:16 +0200 Subject: [PATCH] feat(GODT-2802): IMAP Serivce Handles all IMAP related tasks. Unlike the previous iteration, this new service automatically adds and removes users from Gluon by interfacing with server manager. --- internal/bridge/events.go | 35 + internal/services/imapservice/api_client.go | 50 ++ internal/services/imapservice/connector.go | 674 ++++++++++++++++++ internal/services/imapservice/helpers.go | 191 +++++ internal/services/imapservice/imap_updates.go | 124 ++++ .../services/imapservice/server_manager.go | 61 ++ internal/services/imapservice/service.go | 591 +++++++++++++++ .../imapservice/service_address_events.go | 166 +++++ .../imapservice/service_label_events.go | 179 +++++ .../imapservice/service_message_events.go | 346 +++++++++ internal/services/imapservice/service_sync.go | 126 ++++ .../services/imapservice/shared_identity.go | 110 +++ .../services/imapservice/shared_labels.go | 132 ++++ internal/services/imapservice/sync.go | 202 ++++++ .../imapservice/sync_attachment_downloader.go | 115 +++ internal/services/imapservice/sync_build.go | 174 +++++ .../services/imapservice/sync_build_test.go | 80 +++ internal/services/imapservice/sync_labels.go | 76 ++ .../services/imapservice/sync_messages.go | 523 ++++++++++++++ .../services/imapservice/sync_reporter.go | 72 ++ 20 files changed, 4027 insertions(+) create mode 100644 internal/bridge/events.go create mode 100644 internal/services/imapservice/api_client.go create mode 100644 internal/services/imapservice/connector.go create mode 100644 internal/services/imapservice/helpers.go create mode 100644 internal/services/imapservice/imap_updates.go create mode 100644 internal/services/imapservice/server_manager.go create mode 100644 internal/services/imapservice/service.go create mode 100644 internal/services/imapservice/service_address_events.go create mode 100644 internal/services/imapservice/service_label_events.go create mode 100644 internal/services/imapservice/service_message_events.go create mode 100644 internal/services/imapservice/service_sync.go create mode 100644 internal/services/imapservice/shared_identity.go create mode 100644 internal/services/imapservice/shared_labels.go create mode 100644 internal/services/imapservice/sync.go create mode 100644 internal/services/imapservice/sync_attachment_downloader.go create mode 100644 internal/services/imapservice/sync_build.go create mode 100644 internal/services/imapservice/sync_build_test.go create mode 100644 internal/services/imapservice/sync_labels.go create mode 100644 internal/services/imapservice/sync_messages.go create mode 100644 internal/services/imapservice/sync_reporter.go diff --git a/internal/bridge/events.go b/internal/bridge/events.go new file mode 100644 index 00000000..e74ce269 --- /dev/null +++ b/internal/bridge/events.go @@ -0,0 +1,35 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package bridge + +import ( + "github.com/ProtonMail/gluon/watcher" + "github.com/ProtonMail/proton-bridge/v3/internal/events" +) + +type bridgeEventSubscription struct { + b *Bridge +} + +func (b bridgeEventSubscription) Add(ofType ...events.Event) *watcher.Watcher[events.Event] { + return b.b.addWatcher(ofType...) +} + +func (b bridgeEventSubscription) Remove(watcher *watcher.Watcher[events.Event]) { + b.b.remWatcher(watcher) +} diff --git a/internal/services/imapservice/api_client.go b/internal/services/imapservice/api_client.go new file mode 100644 index 00000000..c3b3dbc8 --- /dev/null +++ b/internal/services/imapservice/api_client.go @@ -0,0 +1,50 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "io" + + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/gopenpgp/v2/crypto" + "github.com/bradenaw/juniper/stream" +) + +type APIClient interface { + CreateLabel(ctx context.Context, req proton.CreateLabelReq) (proton.Label, error) + GetLabel(ctx context.Context, labelID string, labelTypes ...proton.LabelType) (proton.Label, error) + UpdateLabel(ctx context.Context, labelID string, req proton.UpdateLabelReq) (proton.Label, error) + DeleteLabel(ctx context.Context, labelID string) error + LabelMessages(ctx context.Context, messageIDs []string, labelID string) error + UnlabelMessages(ctx context.Context, messageIDs []string, labelID string) error + GetLabels(ctx context.Context, labelTypes ...proton.LabelType) ([]proton.Label, error) + + GetMessage(ctx context.Context, messageID string) (proton.Message, error) + GetMessageMetadataPage(ctx context.Context, page, pageSize int, filter proton.MessageFilter) ([]proton.MessageMetadata, error) + GetMessageIDs(ctx context.Context, afterID string) ([]string, error) + CreateDraft(ctx context.Context, addrKR *crypto.KeyRing, req proton.CreateDraftReq) (proton.Message, error) + UploadAttachment(ctx context.Context, addrKR *crypto.KeyRing, req proton.CreateAttachmentReq) (proton.Attachment, error) + ImportMessages(ctx context.Context, addrKR *crypto.KeyRing, workers, buffer int, req ...proton.ImportReq) (stream.Stream[proton.ImportRes], error) + GetFullMessage(ctx context.Context, messageID string, scheduler proton.Scheduler, storageProvider proton.AttachmentAllocator) (proton.FullMessage, error) + GetAttachmentInto(ctx context.Context, attachmentID string, reader io.ReaderFrom) error + GetAttachment(ctx context.Context, attachmentID string) ([]byte, error) + DeleteMessage(ctx context.Context, messageIDs ...string) error + MarkMessagesRead(ctx context.Context, messageIDs ...string) error + MarkMessagesUnread(ctx context.Context, messageIDs ...string) error +} diff --git a/internal/services/imapservice/connector.go b/internal/services/imapservice/connector.go new file mode 100644 index 00000000..2eb102fb --- /dev/null +++ b/internal/services/imapservice/connector.go @@ -0,0 +1,674 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "bytes" + "context" + "errors" + "fmt" + "net/mail" + "sync/atomic" + "time" + + "github.com/ProtonMail/gluon/async" + "github.com/ProtonMail/gluon/connector" + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/gluon/rfc822" + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/gopenpgp/v2/crypto" + "github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder" + "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" + "github.com/ProtonMail/proton-bridge/v3/pkg/message" + "github.com/ProtonMail/proton-bridge/v3/pkg/message/parser" + "github.com/bradenaw/juniper/stream" + "github.com/sirupsen/logrus" + "golang.org/x/exp/slices" +) + +// Connector contains all IMAP state required to satisfy sync and or imap queries. +type Connector struct { + addrID string + showAllMail uint32 + + flags imap.FlagSet + permFlags imap.FlagSet + attrs imap.FlagSet + + identityState sharedIdentity + client APIClient + telemetry Telemetry + panicHandler async.PanicHandler + sendRecorder *sendrecorder.SendRecorder + + addressMode usertypes.AddressMode + labels sharedLabels + updateCh *async.QueuedChannel[imap.Update] + log *logrus.Entry +} + +func NewConnector( + addrID string, + apiClient APIClient, + labels sharedLabels, + identityState sharedIdentity, + addressMode usertypes.AddressMode, + sendRecorder *sendrecorder.SendRecorder, + panicHandler async.PanicHandler, + telemetry Telemetry, + showAllMail bool, +) *Connector { + userID := identityState.UserID() + + return &Connector{ + identityState: identityState, + addrID: addrID, + showAllMail: b32(showAllMail), + flags: defaultFlags, + permFlags: defaultPermanentFlags, + attrs: defaultAttributes, + + client: apiClient, + telemetry: telemetry, + panicHandler: panicHandler, + sendRecorder: sendRecorder, + + updateCh: async.NewQueuedChannel[imap.Update]( + 0, + 0, + panicHandler, + fmt.Sprintf("connector-update-%v-%v", userID, addrID), + ), + labels: labels, + addressMode: addressMode, + log: logrus.WithFields(logrus.Fields{ + "gluon-connector": addressMode, + "addr-id": addrID, + "user-id": userID, + }), + } +} + +func (s *Connector) StateClose() { + s.log.Debug("Closing state") + s.updateCh.CloseAndDiscardQueued() +} + +func (s *Connector) Authorize(ctx context.Context, username string, password []byte) bool { + addrID, err := s.identityState.CheckAuth(username, password, s.telemetry) + if err != nil { + return false + } + + if s.addressMode == usertypes.AddressModeSplit && addrID != s.addrID { + return false + } + + s.telemetry.SendConfigStatusSuccess(ctx) + + return true +} + +func (s *Connector) CreateMailbox(ctx context.Context, name []string) (imap.Mailbox, error) { + if len(name) < 2 { + return imap.Mailbox{}, fmt.Errorf("invalid mailbox name %q: %w", name, connector.ErrOperationNotAllowed) + } + + switch name[0] { + case folderPrefix: + return s.createFolder(ctx, name[1:]) + + case labelPrefix: + return s.createLabel(ctx, name[1:]) + + default: + return imap.Mailbox{}, fmt.Errorf("invalid mailbox name %q: %w", name, connector.ErrOperationNotAllowed) + } +} + +func (s *Connector) GetMessageLiteral(ctx context.Context, id imap.MessageID) ([]byte, error) { + msg, err := s.client.GetFullMessage(ctx, string(id), usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator()) + if err != nil { + return nil, err + } + + var literal []byte + err = s.identityState.WithAddrKR(msg.AddressID, func(_, addrKR *crypto.KeyRing) error { + l, buildErr := message.BuildRFC822(addrKR, msg.Message, msg.AttData, defaultMessageJobOpts()) + if buildErr != nil { + return buildErr + } + + literal = l + + return nil + }) + + return literal, err +} + +func (s *Connector) GetMailboxVisibility(_ context.Context, mboxID imap.MailboxID) imap.MailboxVisibility { + switch mboxID { + case proton.AllMailLabel: + if atomic.LoadUint32(&s.showAllMail) != 0 { + return imap.Visible + } + return imap.Hidden + + case proton.AllScheduledLabel: + return imap.HiddenIfEmpty + default: + return imap.Visible + } +} + +func (s *Connector) UpdateMailboxName(ctx context.Context, mboxID imap.MailboxID, name []string) error { + if len(name) < 2 { + return fmt.Errorf("invalid mailbox name %q: %w", name, connector.ErrOperationNotAllowed) + } + + switch name[0] { + case folderPrefix: + return s.updateFolder(ctx, mboxID, name[1:]) + + case labelPrefix: + return s.updateLabel(ctx, mboxID, name[1:]) + + default: + return fmt.Errorf("invalid mailbox name %q: %w", name, connector.ErrOperationNotAllowed) + } +} + +func (s *Connector) DeleteMailbox(ctx context.Context, mboxID imap.MailboxID) error { + if err := s.client.DeleteLabel(ctx, string(mboxID)); err != nil { + return err + } + + wLabels := s.labels.Write() + defer wLabels.Close() + + wLabels.Delete(string(mboxID)) + + return nil +} + +func (s *Connector) CreateMessage(ctx context.Context, mailboxID imap.MailboxID, literal []byte, flags imap.FlagSet, _ time.Time) (imap.Message, []byte, error) { + if mailboxID == proton.AllMailLabel { + return imap.Message{}, nil, connector.ErrOperationNotAllowed + } + + toList, err := getLiteralToList(literal) + if err != nil { + return imap.Message{}, nil, fmt.Errorf("failed to retrieve addresses from literal:%w", err) + } + + // Compute the hash of the message (to match it against SMTP messages). + hash, err := sendrecorder.GetMessageHash(literal) + if err != nil { + return imap.Message{}, nil, err + } + + // Check if we already tried to send this message recently. + if messageID, ok, err := s.sendRecorder.HasEntryWait(ctx, hash, time.Now().Add(90*time.Second), toList); err != nil { + return imap.Message{}, nil, fmt.Errorf("failed to check send hash: %w", err) + } else if ok { + s.log.WithField("messageID", messageID).Warn("Message already sent") + + // Query the server-side message. + full, err := s.client.GetFullMessage(ctx, messageID, usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator()) + if err != nil { + return imap.Message{}, nil, fmt.Errorf("failed to fetch message: %w", err) + } + + // Build the message as it is on the server. + if err := s.identityState.WithAddrKR(full.AddressID, func(_, addrKR *crypto.KeyRing) error { + var err error + + if literal, err = message.BuildRFC822(addrKR, full.Message, full.AttData, defaultMessageJobOpts()); err != nil { + return err + } + + return nil + }); err != nil { + return imap.Message{}, nil, fmt.Errorf("failed to build message: %w", err) + } + + return toIMAPMessage(full.MessageMetadata), literal, nil + } + + wantLabelIDs := []string{string(mailboxID)} + + if flags.Contains(imap.FlagFlagged) { + wantLabelIDs = append(wantLabelIDs, proton.StarredLabel) + } + + var wantFlags proton.MessageFlag + + unread := !flags.Contains(imap.FlagSeen) + + if mailboxID != proton.DraftsLabel { + header, err := rfc822.Parse(literal).ParseHeader() + if err != nil { + return imap.Message{}, nil, err + } + + switch { + case mailboxID == proton.InboxLabel: + wantFlags = wantFlags.Add(proton.MessageFlagReceived) + + case mailboxID == proton.SentLabel: + wantFlags = wantFlags.Add(proton.MessageFlagSent) + + case header.Has("Received"): + wantFlags = wantFlags.Add(proton.MessageFlagReceived) + + default: + wantFlags = wantFlags.Add(proton.MessageFlagSent) + } + } else { + unread = false + } + + if flags.Contains(imap.FlagAnswered) { + wantFlags = wantFlags.Add(proton.MessageFlagReplied) + } + + msg, literal, err := s.importMessage(ctx, literal, wantLabelIDs, wantFlags, unread) + if err != nil { + if errors.Is(err, proton.ErrImportSizeExceeded) { + // Remap error so that Gluon does not put this message in the recovery mailbox. + err = fmt.Errorf("%v: %w", err, connector.ErrMessageSizeExceedsLimits) + } + + if apiErr := new(proton.APIError); errors.As(err, &apiErr) { + s.log.WithError(apiErr).WithField("Details", apiErr.DetailsToString()).Error("Failed to import message") + } else { + s.log.WithError(err).Error("Failed to import message") + } + } + + return msg, literal, err +} + +func (s *Connector) AddMessagesToMailbox(ctx context.Context, messageIDs []imap.MessageID, mboxID imap.MailboxID) error { + if isAllMailOrScheduled(mboxID) { + return connector.ErrOperationNotAllowed + } + + return s.client.LabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), string(mboxID)) +} + +func (s *Connector) RemoveMessagesFromMailbox(ctx context.Context, messageIDs []imap.MessageID, mboxID imap.MailboxID) error { + if isAllMailOrScheduled(mboxID) { + return connector.ErrOperationNotAllowed + } + + msgIDs := usertypes.MapTo[imap.MessageID, string](messageIDs) + if err := s.client.UnlabelMessages(ctx, msgIDs, string(mboxID)); err != nil { + return err + } + + if mboxID == proton.TrashLabel || mboxID == proton.DraftsLabel { + if err := s.client.DeleteMessage(ctx, msgIDs...); err != nil { + return err + } + } + + return nil +} + +func (s *Connector) MoveMessages(ctx context.Context, messageIDs []imap.MessageID, mboxFromID, mboxToID imap.MailboxID) (bool, error) { + if (mboxFromID == proton.InboxLabel && mboxToID == proton.SentLabel) || + (mboxFromID == proton.SentLabel && mboxToID == proton.InboxLabel) || + isAllMailOrScheduled(mboxFromID) || + isAllMailOrScheduled(mboxToID) { + return false, connector.ErrOperationNotAllowed + } + + shouldExpungeOldLocation := func() bool { + rdLabels := s.labels.Read() + defer rdLabels.Close() + + var result bool + + if v, ok := rdLabels.GetLabel(string(mboxFromID)); ok && v.Type == proton.LabelTypeLabel { + result = true + } + + if v, ok := rdLabels.GetLabel(string(mboxToID)); ok && (v.Type == proton.LabelTypeFolder || v.Type == proton.LabelTypeSystem) { + result = true + } + + return result + }() + + if err := s.client.LabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), string(mboxToID)); err != nil { + return false, fmt.Errorf("labeling messages: %w", err) + } + + if shouldExpungeOldLocation { + if err := s.client.UnlabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), string(mboxFromID)); err != nil { + return false, fmt.Errorf("unlabeling messages: %w", err) + } + } + + return shouldExpungeOldLocation, nil +} + +func (s *Connector) MarkMessagesSeen(ctx context.Context, messageIDs []imap.MessageID, seen bool) error { + if seen { + return s.client.MarkMessagesRead(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs)...) + } + + return s.client.MarkMessagesUnread(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs)...) +} + +func (s *Connector) MarkMessagesFlagged(ctx context.Context, messageIDs []imap.MessageID, flagged bool) error { + if flagged { + return s.client.LabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), proton.StarredLabel) + } + + return s.client.UnlabelMessages(ctx, usertypes.MapTo[imap.MessageID, string](messageIDs), proton.StarredLabel) +} + +func (s *Connector) GetUpdates() <-chan imap.Update { + return s.updateCh.GetChannel() +} + +func (s *Connector) Close(_ context.Context) error { + // Nothing to do + return nil +} + +func (s *Connector) ShowAllMail(v bool) { + atomic.StoreUint32(&s.showAllMail, b32(v)) +} + +var ( + defaultFlags = imap.NewFlagSet(imap.FlagSeen, imap.FlagFlagged, imap.FlagDeleted) // nolint:gochecknoglobals + defaultPermanentFlags = imap.NewFlagSet(imap.FlagSeen, imap.FlagFlagged, imap.FlagDeleted) // nolint:gochecknoglobals + defaultAttributes = imap.NewFlagSet() // nolint:gochecknoglobals +) + +const ( + folderPrefix = "Folders" + labelPrefix = "Labels" +) + +// b32 returns a uint32 0 or 1 representing b. +func b32(b bool) uint32 { + if b { + return 1 + } + + return 0 +} + +func (s *Connector) createLabel(ctx context.Context, name []string) (imap.Mailbox, error) { + if len(name) != 1 { + return imap.Mailbox{}, fmt.Errorf("a label cannot have children: %w", connector.ErrOperationNotAllowed) + } + + label, err := s.client.CreateLabel(ctx, proton.CreateLabelReq{ + Name: name[0], + Color: "#f66", + Type: proton.LabelTypeLabel, + }) + if err != nil { + return imap.Mailbox{}, err + } + + wLabels := s.labels.Write() + defer wLabels.Close() + + wLabels.SetLabel(label.ID, label) + + return toIMAPMailbox(label, s.flags, s.permFlags, s.attrs), nil +} + +func (s *Connector) createFolder(ctx context.Context, name []string) (imap.Mailbox, error) { + var parentID string + + wLabels := s.labels.Write() + defer wLabels.Close() + + if len(name) > 1 { + for _, label := range wLabels.GetLabels() { + if !slices.Equal(label.Path, name[:len(name)-1]) { + continue + } + + parentID = label.ID + + break + } + + if parentID == "" { + return imap.Mailbox{}, fmt.Errorf("parent folder %q does not exist: %w", name[:len(name)-1], connector.ErrOperationNotAllowed) + } + } + + label, err := s.client.CreateLabel(ctx, proton.CreateLabelReq{ + Name: name[len(name)-1], + Color: "#f66", + Type: proton.LabelTypeFolder, + ParentID: parentID, + }) + if err != nil { + return imap.Mailbox{}, err + } + + // Add label to list so subsequent sub folder create requests work correct. + wLabels.SetLabel(label.ID, label) + + return toIMAPMailbox(label, s.flags, s.permFlags, s.attrs), nil +} + +func (s *Connector) updateLabel(ctx context.Context, labelID imap.MailboxID, name []string) error { + if len(name) != 1 { + return fmt.Errorf("a label cannot have children: %w", connector.ErrOperationNotAllowed) + } + + label, err := s.client.GetLabel(ctx, string(labelID), proton.LabelTypeLabel) + if err != nil { + return err + } + + update, err := s.client.UpdateLabel(ctx, label.ID, proton.UpdateLabelReq{ + Name: name[0], + Color: label.Color, + }) + if err != nil { + return err + } + + wLabels := s.labels.Write() + defer wLabels.Close() + + wLabels.SetLabel(label.ID, update) + + return nil +} + +func (s *Connector) updateFolder(ctx context.Context, labelID imap.MailboxID, name []string) error { + var parentID string + + wLabels := s.labels.Write() + defer wLabels.Close() + + if len(name) > 1 { + for _, label := range wLabels.GetLabels() { + if !slices.Equal(label.Path, name[:len(name)-1]) { + continue + } + + parentID = label.ID + + break + } + + if parentID == "" { + return fmt.Errorf("parent folder %q does not exist: %w", name[:len(name)-1], connector.ErrOperationNotAllowed) + } + } + + label, err := s.client.GetLabel(ctx, string(labelID), proton.LabelTypeFolder) + if err != nil { + return err + } + + update, err := s.client.UpdateLabel(ctx, string(labelID), proton.UpdateLabelReq{ + Name: name[len(name)-1], + Color: label.Color, + ParentID: parentID, + }) + if err != nil { + return err + } + + wLabels.SetLabel(label.ID, update) + + return nil +} + +func (s *Connector) importMessage( + ctx context.Context, + literal []byte, + labelIDs []string, + flags proton.MessageFlag, + unread bool, +) (imap.Message, []byte, error) { + var full proton.FullMessage + + addr, ok := s.identityState.GetAddress(s.addrID) + if !ok { + return imap.Message{}, nil, fmt.Errorf("could not find address") + } + + if err := s.identityState.WithAddrKR(s.addrID, func(_, addrKR *crypto.KeyRing) error { + var messageID string + + if slices.Contains(labelIDs, proton.DraftsLabel) { + msg, err := s.createDraft(ctx, literal, addrKR, addr) + if err != nil { + return fmt.Errorf("failed to create draft: %w", err) + } + + // apply labels + + messageID = msg.ID + } else { + str, err := s.client.ImportMessages(ctx, addrKR, 1, 1, []proton.ImportReq{{ + Metadata: proton.ImportMetadata{ + AddressID: s.addrID, + LabelIDs: labelIDs, + Unread: proton.Bool(unread), + Flags: flags, + }, + Message: literal, + }}...) + if err != nil { + return fmt.Errorf("failed to prepare message for import: %w", err) + } + + res, err := stream.Collect(ctx, str) + if err != nil { + return fmt.Errorf("failed to import message: %w", err) + } + + messageID = res[0].MessageID + } + + var err error + + if full, err = s.client.GetFullMessage(ctx, messageID, usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator()); err != nil { + return fmt.Errorf("failed to fetch message: %w", err) + } + + if literal, err = message.BuildRFC822(addrKR, full.Message, full.AttData, defaultMessageJobOpts()); err != nil { + return fmt.Errorf("failed to build message: %w", err) + } + + return nil + }); err != nil { + return imap.Message{}, nil, err + } + + return toIMAPMessage(full.MessageMetadata), literal, nil +} + +func (s *Connector) createDraft(ctx context.Context, literal []byte, addrKR *crypto.KeyRing, sender proton.Address) (proton.Message, error) { + // Create a new message parser from the reader. + parser, err := parser.New(bytes.NewReader(literal)) + if err != nil { + return proton.Message{}, fmt.Errorf("failed to create parser: %w", err) + } + + message, err := message.ParseWithParser(parser, true) + if err != nil { + return proton.Message{}, fmt.Errorf("failed to parse message: %w", err) + } + + decBody := string(message.PlainBody) + if message.RichBody != "" { + decBody = string(message.RichBody) + } + + draft, err := s.client.CreateDraft(ctx, addrKR, proton.CreateDraftReq{ + Message: proton.DraftTemplate{ + Subject: message.Subject, + Body: decBody, + MIMEType: message.MIMEType, + + Sender: &mail.Address{Name: sender.DisplayName, Address: sender.Email}, + ToList: message.ToList, + CCList: message.CCList, + BCCList: message.BCCList, + + ExternalID: message.ExternalID, + }, + }) + + if err != nil { + return proton.Message{}, fmt.Errorf("failed to create draft: %w", err) + } + + for _, att := range message.Attachments { + disposition := proton.AttachmentDisposition + if att.Disposition == "inline" && att.ContentID != "" { + disposition = proton.InlineDisposition + } + + if _, err := s.client.UploadAttachment(ctx, addrKR, proton.CreateAttachmentReq{ + MessageID: draft.ID, + Filename: att.Name, + MIMEType: rfc822.MIMEType(att.MIMEType), + Disposition: disposition, + ContentID: att.ContentID, + Body: att.Data, + }); err != nil { + return proton.Message{}, fmt.Errorf("failed to add attachment to draft: %w", err) + } + } + + return draft, nil +} + +func (s *Connector) publishUpdate(_ context.Context, update imap.Update) { + s.updateCh.Enqueue(update) +} diff --git a/internal/services/imapservice/helpers.go b/internal/services/imapservice/helpers.go new file mode 100644 index 00000000..af9e0b3e --- /dev/null +++ b/internal/services/imapservice/helpers.go @@ -0,0 +1,191 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + "net/mail" + "time" + + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/gluon/rfc5322" + "github.com/ProtonMail/gluon/rfc822" + "github.com/ProtonMail/go-proton-api" + "github.com/bradenaw/juniper/xslices" +) + +func toIMAPMailbox(label proton.Label, flags, permFlags, attrs imap.FlagSet) imap.Mailbox { + if label.Type == proton.LabelTypeLabel { + label.Path = append([]string{labelPrefix}, label.Path...) + } else if label.Type == proton.LabelTypeFolder { + label.Path = append([]string{folderPrefix}, label.Path...) + } + + return imap.Mailbox{ + ID: imap.MailboxID(label.ID), + Name: label.Path, + Flags: flags, + PermanentFlags: permFlags, + Attributes: attrs, + } +} + +func isAllMailOrScheduled(mailboxID imap.MailboxID) bool { + return (mailboxID == proton.AllMailLabel) || (mailboxID == proton.AllScheduledLabel) +} + +func BuildFlagSetFromMessageMetadata(message proton.MessageMetadata) imap.FlagSet { + flags := imap.NewFlagSet() + + if message.Seen() { + flags.AddToSelf(imap.FlagSeen) + } + + if message.Starred() { + flags.AddToSelf(imap.FlagFlagged) + } + + if message.IsDraft() { + flags.AddToSelf(imap.FlagDraft) + } + + if message.IsRepliedAll == true || message.IsReplied == true { //nolint: gosimple + flags.AddToSelf(imap.FlagAnswered) + } + + return flags +} + +func getLiteralToList(literal []byte) ([]string, error) { + headerLiteral, _ := rfc822.Split(literal) + + header, err := rfc822.NewHeader(headerLiteral) + if err != nil { + return nil, err + } + + var result []string + + parseAddress := func(field string) error { + if fieldAddr, ok := header.GetChecked(field); ok { + addr, err := rfc5322.ParseAddressList(fieldAddr) + if err != nil { + return fmt.Errorf("failed to parse addresses for '%v': %w", field, err) + } + + result = append(result, xslices.Map(addr, func(addr *mail.Address) string { + return addr.Address + })...) + + return nil + } + + return nil + } + + if err := parseAddress("To"); err != nil { + return nil, err + } + + if err := parseAddress("Cc"); err != nil { + return nil, err + } + + if err := parseAddress("Bcc"); err != nil { + return nil, err + } + + return result, nil +} + +func toIMAPMessage(message proton.MessageMetadata) imap.Message { + flags := BuildFlagSetFromMessageMetadata(message) + + var date time.Time + + if message.Time > 0 { + date = time.Unix(message.Time, 0) + } else { + date = time.Now() + } + + return imap.Message{ + ID: imap.MessageID(message.ID), + Flags: flags, + Date: date, + } +} + +func WantLabel(label proton.Label) bool { + if label.Type != proton.LabelTypeSystem { + return true + } + + // nolint:exhaustive + switch label.ID { + case proton.InboxLabel: + return true + + case proton.TrashLabel: + return true + + case proton.SpamLabel: + return true + + case proton.AllMailLabel: + return true + + case proton.ArchiveLabel: + return true + + case proton.SentLabel: + return true + + case proton.DraftsLabel: + return true + + case proton.StarredLabel: + return true + + case proton.AllScheduledLabel: + return true + + default: + return false + } +} + +func wantLabels(apiLabels map[string]proton.Label, labelIDs []string) []string { + return xslices.Filter(labelIDs, func(labelID string) bool { + apiLabel, ok := apiLabels[labelID] + if !ok { + return false + } + + return WantLabel(apiLabel) + }) +} + +// sleepCtx sleeps for the given duration, or until the context is canceled. +func sleepCtx(ctx context.Context, d time.Duration) { + select { + case <-ctx.Done(): + case <-time.After(d): + } +} diff --git a/internal/services/imapservice/imap_updates.go b/internal/services/imapservice/imap_updates.go new file mode 100644 index 00000000..c7f297bd --- /dev/null +++ b/internal/services/imapservice/imap_updates.go @@ -0,0 +1,124 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + "strings" + + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/go-proton-api" +) + +func newSystemMailboxCreatedUpdate(labelID imap.MailboxID, labelName string) *imap.MailboxCreated { + if strings.EqualFold(labelName, imap.Inbox) { + labelName = imap.Inbox + } + + attrs := imap.NewFlagSet(imap.AttrNoInferiors) + permanentFlags := defaultPermanentFlags + flags := defaultFlags + + switch labelID { + case proton.TrashLabel: + attrs = attrs.Add(imap.AttrTrash) + + case proton.SpamLabel: + attrs = attrs.Add(imap.AttrJunk) + + case proton.AllMailLabel: + attrs = attrs.Add(imap.AttrAll) + flags = imap.NewFlagSet(imap.FlagSeen, imap.FlagFlagged) + permanentFlags = imap.NewFlagSet(imap.FlagSeen, imap.FlagFlagged) + + case proton.ArchiveLabel: + attrs = attrs.Add(imap.AttrArchive) + + case proton.SentLabel: + attrs = attrs.Add(imap.AttrSent) + + case proton.DraftsLabel: + attrs = attrs.Add(imap.AttrDrafts) + + case proton.StarredLabel: + attrs = attrs.Add(imap.AttrFlagged) + + case proton.AllScheduledLabel: + labelName = "Scheduled" // API actual name is "All Scheduled" + } + + return imap.NewMailboxCreated(imap.Mailbox{ + ID: labelID, + Name: []string{labelName}, + Flags: flags, + PermanentFlags: permanentFlags, + Attributes: attrs, + }) +} + +func waitOnIMAPUpdates(ctx context.Context, updates []imap.Update) error { + for _, update := range updates { + if err, ok := update.WaitContext(ctx); ok && err != nil { + return fmt.Errorf("failed to apply gluon update %v: %w", update.String(), err) + } + } + + return nil +} + +func newPlaceHolderMailboxCreatedUpdate(labelName string) *imap.MailboxCreated { + return imap.NewMailboxCreated(imap.Mailbox{ + ID: imap.MailboxID(labelName), + Name: []string{labelName}, + Flags: defaultFlags, + PermanentFlags: defaultPermanentFlags, + Attributes: imap.NewFlagSet(imap.AttrNoSelect), + }) +} + +func newMailboxCreatedUpdate(labelID imap.MailboxID, labelName []string) *imap.MailboxCreated { + return imap.NewMailboxCreated(imap.Mailbox{ + ID: labelID, + Name: labelName, + Flags: defaultFlags, + PermanentFlags: defaultPermanentFlags, + Attributes: imap.NewFlagSet(), + }) +} + +func GetMailboxName(label proton.Label) []string { + var name []string + + switch label.Type { + case proton.LabelTypeFolder: + name = append([]string{folderPrefix}, label.Path...) + + case proton.LabelTypeLabel: + name = append([]string{labelPrefix}, label.Path...) + + case proton.LabelTypeContactGroup: + fallthrough + case proton.LabelTypeSystem: + fallthrough + default: + name = label.Path + } + + return name +} diff --git a/internal/services/imapservice/server_manager.go b/internal/services/imapservice/server_manager.go new file mode 100644 index 00000000..2085f778 --- /dev/null +++ b/internal/services/imapservice/server_manager.go @@ -0,0 +1,61 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + + "github.com/ProtonMail/gluon/connector" +) + +type IMAPServerManager interface { + AddIMAPUser( + ctx context.Context, + connector connector.Connector, + addrID string, + idProvider GluonIDProvider, + syncStateProvider SyncStateProvider, + ) error + + RemoveIMAPUser(ctx context.Context, deleteData bool, provider GluonIDProvider, addrID ...string) error +} + +type NullIMAPServerManager struct{} + +func (n NullIMAPServerManager) AddIMAPUser( + _ context.Context, + _ connector.Connector, + _ string, + _ GluonIDProvider, + _ SyncStateProvider, +) error { + return nil +} + +func (n NullIMAPServerManager) RemoveIMAPUser( + _ context.Context, + _ bool, + _ GluonIDProvider, + _ ...string, +) error { + return nil +} + +func NewNullIMAPServerManager() *NullIMAPServerManager { + return &NullIMAPServerManager{} +} diff --git a/internal/services/imapservice/service.go b/internal/services/imapservice/service.go new file mode 100644 index 00000000..8a55bfa8 --- /dev/null +++ b/internal/services/imapservice/service.go @@ -0,0 +1,591 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + + "github.com/ProtonMail/gluon/async" + "github.com/ProtonMail/gluon/reporter" + "github.com/ProtonMail/gluon/watcher" + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/proton-bridge/v3/internal/events" + "github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks" + "github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder" + "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents" + "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity" + "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" + "github.com/ProtonMail/proton-bridge/v3/internal/vault" + "github.com/ProtonMail/proton-bridge/v3/pkg/cpc" + "github.com/sirupsen/logrus" +) + +type EventProvider interface { + userevents.Subscribable + userevents.EventController +} + +type Telemetry interface { + useridentity.Telemetry + SendConfigStatusSuccess(ctx context.Context) +} + +type GluonIDProvider interface { + GetGluonID(addrID string) (string, bool) + GetGluonIDs() map[string]string + SetGluonID(addrID, gluonID string) error + RemoveGluonID(addrID, gluonID string) error + GluonKey() []byte +} + +type SyncStateProvider interface { + AddFailedMessageID(messageID string) error + RemFailedMessageID(messageID string) error + GetSyncStatus() vault.SyncStatus + ClearSyncStatus() error + SetHasLabels(bool) error + SetHasMessages(bool) error + SetLastMessageID(messageID string) error +} + +type Service struct { + log *logrus.Entry + cpc *cpc.CPC + + client APIClient + identityState *rwIdentity + labels *rwLabels + addressMode usertypes.AddressMode + + refreshSubscriber *userevents.RefreshChanneledSubscriber + addressSubscriber *userevents.AddressChanneledSubscriber + userSubscriber *userevents.UserChanneledSubscriber + messageSubscriber *userevents.MessageChanneledSubscriber + labelSubscriber *userevents.LabelChanneledSubscriber + + gluonIDProvider GluonIDProvider + syncStateProvider SyncStateProvider + eventProvider EventProvider + serverManager IMAPServerManager + eventPublisher events.EventPublisher + + telemetry Telemetry + panicHandler async.PanicHandler + sendRecorder *sendrecorder.SendRecorder + reporter reporter.Reporter + + eventSubscription events.Subscription + eventWatcher *watcher.Watcher[events.Event] + connectors map[string]*Connector + maxSyncMemory uint64 + showAllMail bool +} + +func NewService( + client APIClient, + identityState *useridentity.State, + gluonIDProvider GluonIDProvider, + syncStateProvider SyncStateProvider, + eventProvider EventProvider, + serverManager IMAPServerManager, + eventPublisher events.EventPublisher, + bridgePassProvider useridentity.BridgePassProvider, + keyPassProvider useridentity.KeyPassProvider, + panicHandler async.PanicHandler, + sendRecorder *sendrecorder.SendRecorder, + telemetry Telemetry, + reporter reporter.Reporter, + addressMode usertypes.AddressMode, + subscription events.Subscription, + maxSyncMemory uint64, + showAllMail bool, +) *Service { + subscriberName := fmt.Sprintf("imap-%v", identityState.User.ID) + + return &Service{ + cpc: cpc.NewCPC(), + log: logrus.WithFields(logrus.Fields{ + "user": identityState.User.ID, + "service": "imap", + }), + client: client, + identityState: newRWIdentity(identityState, bridgePassProvider, keyPassProvider), + labels: newRWLabels(), + addressMode: addressMode, + + gluonIDProvider: gluonIDProvider, + serverManager: serverManager, + syncStateProvider: syncStateProvider, + eventProvider: eventProvider, + eventPublisher: eventPublisher, + + refreshSubscriber: userevents.NewRefreshSubscriber(subscriberName), + addressSubscriber: userevents.NewAddressSubscriber(subscriberName), + userSubscriber: userevents.NewUserSubscriber(subscriberName), + messageSubscriber: userevents.NewMessageSubscriber(subscriberName), + labelSubscriber: userevents.NewLabelSubscriber(subscriberName), + + panicHandler: panicHandler, + sendRecorder: sendRecorder, + telemetry: telemetry, + reporter: reporter, + + connectors: make(map[string]*Connector), + maxSyncMemory: maxSyncMemory, + + eventWatcher: subscription.Add(events.IMAPServerCreated{}), + eventSubscription: subscription, + showAllMail: showAllMail, + } +} + +func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) error { + // Get user labels + apiLabels, err := s.client.GetLabels(ctx, proton.LabelTypeSystem, proton.LabelTypeFolder, proton.LabelTypeLabel) + if err != nil { + return fmt.Errorf("failed to get labels: %w", err) + } + + s.labels.SetLabels(apiLabels) + + { + connectors, err := s.buildConnectors() + if err != nil { + s.log.WithError(err).Error("Failed to build connectors") + return err + } + s.connectors = connectors + } + + if err := s.addConnectorsToServer(ctx, s.connectors); err != nil { + return err + } + + group.Go(ctx, s.identityState.identity.User.ID, "imap-service", s.run) + return nil +} + +func (s *Service) SetAddressMode(ctx context.Context, mode usertypes.AddressMode) error { + _, err := s.cpc.Send(ctx, &setAddressModeReq{mode: mode}) + + return err +} + +func (s *Service) Resync(ctx context.Context) error { + _, err := s.cpc.Send(ctx, &resyncReq{}) + + return err +} + +func (s *Service) CancelSync(ctx context.Context) error { + _, err := s.cpc.Send(ctx, &cancelSyncReq{}) + + return err +} + +func (s *Service) ResumeSync(ctx context.Context) error { + _, err := s.cpc.Send(ctx, &cancelSyncReq{}) + + return err +} + +func (s *Service) OnBadEvent(ctx context.Context) error { + _, err := s.cpc.Send(ctx, &onBadEventReq{}) + + return err +} + +func (s *Service) OnBadEventResync(ctx context.Context) error { + _, err := s.cpc.Send(ctx, &onBadEventResyncReq{}) + + return err +} + +func (s *Service) OnLogout(ctx context.Context) error { + _, err := s.cpc.Send(ctx, &onLogoutReq{}) + + return err +} + +func (s *Service) ShowAllMail(ctx context.Context, v bool) error { + _, err := s.cpc.Send(ctx, &showAllMailReq{v: v}) + + return err +} + +func (s *Service) GetLabels(ctx context.Context) (map[string]proton.Label, error) { + return cpc.SendTyped[map[string]proton.Label](ctx, s.cpc, &getLabelsReq{}) +} + +func (s *Service) Close() { + for _, c := range s.connectors { + c.StateClose() + } + + s.connectors = make(map[string]*Connector) +} + +func (s *Service) run(ctx context.Context) { //nolint gocyclo + s.log.Info("Starting IMAP Service") + defer s.log.Info("Exiting IMAP Service") + + defer s.cpc.Close() + defer s.eventSubscription.Remove(s.eventWatcher) + + syncHandler := newSyncHandler(ctx, s.panicHandler) + defer syncHandler.Close() + + syncHandler.launch(s) + + subscription := userevents.Subscription{ + User: s.userSubscriber, + Refresh: s.refreshSubscriber, + Address: s.addressSubscriber, + Labels: s.labelSubscriber, + Messages: s.messageSubscriber, + } + + s.eventProvider.Subscribe(subscription) + defer s.eventProvider.Unsubscribe(subscription) + + for { + select { + case <-ctx.Done(): + return + + case req, ok := <-s.cpc.ReceiveCh(): + if !ok { + continue + } + switch r := req.Value().(type) { + case *setAddressModeReq: + err := s.setAddressMode(ctx, syncHandler, r.mode) + req.Reply(ctx, nil, err) + + case *resyncReq: + s.log.Info("Received resync request, handling as refresh event") + err := s.onRefreshEvent(ctx, syncHandler) + req.Reply(ctx, nil, err) + s.log.Info("Resync reply sent, handling as refresh event") + + case *cancelSyncReq: + s.log.Info("Cancelling sync") + syncHandler.Cancel() + req.Reply(ctx, nil, nil) + + case *resumeSyncReq: + s.log.Info("Resuming sync") + // Cancel previous run, if any, just in case. + syncHandler.CancelAndWait() + syncHandler.launch(s) + req.Reply(ctx, nil, nil) + case *getLabelsReq: + labels := s.labels.GetLabelMap() + req.Reply(ctx, labels, nil) + + case *onBadEventReq: + err := s.removeConnectorsFromServer(ctx, s.connectors, false) + req.Reply(ctx, nil, err) + + case *onBadEventResyncReq: + err := s.addConnectorsToServer(ctx, s.connectors) + req.Reply(ctx, nil, err) + + case *onLogoutReq: + err := s.removeConnectorsFromServer(ctx, s.connectors, false) + req.Reply(ctx, nil, err) + + case *showAllMailReq: + req.Reply(ctx, nil, nil) + s.setShowAllMail(r.v) + + default: + s.log.Error("Received unknown request") + } + + case err, ok := <-syncHandler.OnSyncFinishedCH(): + { + if !ok { + continue + } + + if err != nil { + s.log.WithError(err).Error("Sync failed") + continue + } + + s.log.Info("Sync complete, starting API event stream") + s.eventProvider.Resume() + } + + case update, ok := <-syncHandler.updater.ch: + if !ok { + continue + } + s.onSyncUpdate(ctx, update) + + case e, ok := <-s.userSubscriber.OnEventCh(): + if !ok { + continue + } + e.Consume(func(user proton.User) error { + return s.onUserEvent(user) + }) + case e, ok := <-s.addressSubscriber.OnEventCh(): + if !ok { + continue + } + e.Consume(func(events []proton.AddressEvent) error { + return s.onAddressEvent(ctx, events) + }) + case e, ok := <-s.labelSubscriber.OnEventCh(): + if !ok { + continue + } + e.Consume(func(events []proton.LabelEvent) error { + return s.onLabelEvent(ctx, events) + }) + case e, ok := <-s.messageSubscriber.OnEventCh(): + if !ok { + continue + } + e.Consume(func(events []proton.MessageEvent) error { + return s.onMessageEvent(ctx, events) + }) + case e, ok := <-s.refreshSubscriber.OnEventCh(): + if !ok { + continue + } + e.Consume(func(_ proton.RefreshFlag) error { + return s.onRefreshEvent(ctx, syncHandler) + }) + case e, ok := <-s.eventWatcher.GetChannel(): + if !ok { + continue + } + + if _, ok := e.(events.IMAPServerCreated); ok { + if err := s.addConnectorsToServer(ctx, s.connectors); err != nil { + s.log.WithError(err).Error("Failed to add connector to server after created") + } + } + } + } +} + +func (s *Service) onRefreshEvent(ctx context.Context, handler *syncHandler) error { + s.log.Debug("handling refresh event") + + if err := s.identityState.Write(func(identity *useridentity.State) error { + return identity.OnRefreshEvent(ctx) + }); err != nil { + s.log.WithError(err).Error("Failed to apply refresh event to identity state") + return err + } + + handler.CancelAndWait() + + if err := s.removeConnectorsFromServer(ctx, s.connectors, true); err != nil { + return err + } + + if err := s.syncStateProvider.ClearSyncStatus(); err != nil { + return fmt.Errorf("failed to clear sync status:%w", err) + } + + if err := s.addConnectorsToServer(ctx, s.connectors); err != nil { + return err + } + + handler.launch(s) + + return nil +} + +func (s *Service) onUserEvent(user proton.User) error { + s.log.Debug("handling user event") + return s.identityState.Write(func(identity *useridentity.State) error { + identity.OnUserEvent(user) + return nil + }) +} + +func (s *Service) buildConnectors() (map[string]*Connector, error) { + connectors := make(map[string]*Connector) + + if s.addressMode == usertypes.AddressModeCombined { + addr, err := s.identityState.GetPrimaryAddress() + if err != nil { + return nil, fmt.Errorf("failed to build connector for combined mode: %w", err) + } + + connectors[addr.ID] = NewConnector( + addr.ID, + s.client, + s.labels, + s.identityState, + s.addressMode, + s.sendRecorder, + s.panicHandler, + s.telemetry, + s.showAllMail, + ) + + return connectors, nil + } + + for _, addr := range s.identityState.GetAddresses() { + connectors[addr.ID] = NewConnector( + addr.ID, + s.client, + s.labels, + s.identityState, + s.addressMode, + s.sendRecorder, + s.panicHandler, + s.telemetry, + s.showAllMail, + ) + } + + return connectors, nil +} + +func (s *Service) rebuildConnectors() error { + newConnectors, err := s.buildConnectors() + if err != nil { + return err + } + + for _, c := range s.connectors { + c.StateClose() + } + + s.connectors = newConnectors + + return nil +} + +func (s *Service) onSyncUpdate(ctx context.Context, syncUpdate syncUpdate) { + c, ok := s.connectors[syncUpdate.addrID] + if !ok { + s.log.Warningf("Received syncUpdate for unknown addr (%v), connector may have been removed", syncUpdate.addrID) + syncUpdate.update.Done(fmt.Errorf("undeliverable")) + return + } + + c.publishUpdate(ctx, syncUpdate.update) +} + +func (s *Service) addConnectorsToServer(ctx context.Context, connectors map[string]*Connector) error { + addedConnectors := make([]string, 0, len(connectors)) + for _, c := range connectors { + if err := s.serverManager.AddIMAPUser(ctx, c, c.addrID, s.gluonIDProvider, s.syncStateProvider); err != nil { + s.log.WithError(err).Error("Failed to add connect to imap server") + + if err := s.serverManager.RemoveIMAPUser(ctx, false, s.gluonIDProvider, addedConnectors...); err != nil { + s.log.WithError(err).Error("Failed to remove previously added connectors after failure") + } + } + addedConnectors = append(addedConnectors, c.addrID) + } + + return nil +} + +func (s *Service) removeConnectorsFromServer(ctx context.Context, connectors map[string]*Connector, deleteData bool) error { + addrIDs := make([]string, 0, len(connectors)) + + for _, c := range connectors { + addrIDs = append(addrIDs, c.addrID) + } + + if err := s.serverManager.RemoveIMAPUser(ctx, deleteData, s.gluonIDProvider, addrIDs...); err != nil { + return fmt.Errorf("failed to remove gluon users from server: %w", err) + } + + return nil +} + +func (s *Service) setAddressMode(ctx context.Context, handler *syncHandler, mode usertypes.AddressMode) error { + if s.addressMode == mode { + return nil + } + + s.addressMode = mode + if mode == usertypes.AddressModeSplit { + s.log.Info("Setting Split Address Mode") + } else { + s.log.Info("Setting Combined Address Mode") + } + + handler.CancelAndWait() + + if err := s.removeConnectorsFromServer(ctx, s.connectors, true); err != nil { + return err + } + + if err := s.syncStateProvider.ClearSyncStatus(); err != nil { + return fmt.Errorf("failed to clear sync status:%w", err) + } + + if err := s.rebuildConnectors(); err != nil { + return fmt.Errorf("failed to rebuild connectors: %w", err) + } + + if err := s.addConnectorsToServer(ctx, s.connectors); err != nil { + return err + } + + handler.launch(s) + + return nil +} + +func (s *Service) setShowAllMail(v bool) { + if s.showAllMail == v { + return + } + + s.showAllMail = v + + for _, c := range s.connectors { + c.ShowAllMail(v) + } +} + +type resyncReq struct{} + +type cancelSyncReq struct{} + +type resumeSyncReq struct{} + +type getLabelsReq struct{} + +type onBadEventReq struct{} + +type onBadEventResyncReq struct{} + +type onLogoutReq struct{} + +type showAllMailReq struct{ v bool } + +type setAddressModeReq struct { + mode usertypes.AddressMode +} diff --git a/internal/services/imapservice/service_address_events.go b/internal/services/imapservice/service_address_events.go new file mode 100644 index 00000000..71ca5c85 --- /dev/null +++ b/internal/services/imapservice/service_address_events.go @@ -0,0 +1,166 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity" + "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" +) + +func (s *Service) onAddressEvent(ctx context.Context, events []proton.AddressEvent) error { + s.log.Debug("handling address event") + + if s.addressMode == usertypes.AddressModeCombined { + if err := s.identityState.Write(func(identity *useridentity.State) error { + identity.OnAddressEvents(events) + return nil + }); err != nil { + s.log.WithError(err).Error("Failed to apply address events to identity state") + return err + } + + return nil + } + + if s.addressMode != usertypes.AddressModeSplit { + return nil + } + + for _, event := range events { + switch event.Action { + case proton.EventCreate: + var status useridentity.AddressUpdate + + if err := s.identityState.Write(func(identity *useridentity.State) error { + status = identity.OnAddressCreated(event) + + return nil + }); err != nil { + return fmt.Errorf("failed to update identity state: %w", err) + } + + if status == useridentity.AddressUpdateCreated { + if err := addNewAddressSplitMode(ctx, s, event.Address.ID); err != nil { + return err + } + } + + case proton.EventUpdateFlags, proton.EventUpdate: + var addr proton.Address + var status useridentity.AddressUpdate + + if err := s.identityState.Write(func(identity *useridentity.State) error { + addr, status = identity.OnAddressUpdated(event) + + return nil + }); err != nil { + return fmt.Errorf("failed to update identity state: %w", err) + } + + // nolint:exhaustive + switch status { + case useridentity.AddressUpdateCreated: + if err := addNewAddressSplitMode(ctx, s, addr.ID); err != nil { + return err + } + + case useridentity.AddressUpdateDisabled: + if err := removeAddressSplitMode(ctx, s, addr.ID); err != nil { + return err + } + + case useridentity.AddressUpdateEnabled: + if err := addNewAddressSplitMode(ctx, s, addr.ID); err != nil { + return err + } + + default: + continue + } + + case proton.EventDelete: + var status useridentity.AddressUpdate + + if err := s.identityState.Write(func(identity *useridentity.State) error { + _, status = identity.OnAddressDeleted(event) + + return nil + }); err != nil { + return fmt.Errorf("failed to update identity state: %w", err) + } + if status == useridentity.AddressUpdateDeleted { + if err := removeAddressSplitMode(ctx, s, event.ID); err != nil { + return err + } + } + + default: + return fmt.Errorf("unknown event action: %v", event.Action) + } + } + + return nil +} + +func addNewAddressSplitMode(ctx context.Context, s *Service, addrID string) error { + connector := NewConnector( + addrID, + s.client, + s.labels, + s.identityState, + s.addressMode, + s.sendRecorder, + s.panicHandler, + s.telemetry, + s.showAllMail, + ) + + if err := s.serverManager.AddIMAPUser(ctx, connector, connector.addrID, s.gluonIDProvider, s.syncStateProvider); err != nil { + return fmt.Errorf("failed to add new account to server: %w", err) + } + + s.connectors[connector.addrID] = connector + + if err := syncLabels(ctx, s.labels.GetLabelMap(), connector); err != nil { + return fmt.Errorf("failed to sync labels for new address: %w", err) + } + + return nil +} + +func removeAddressSplitMode(ctx context.Context, s *Service, addrID string) error { + connector, ok := s.connectors[addrID] + if !ok { + s.log.Warnf("Could not find connector ") + return nil + } + + if err := s.serverManager.RemoveIMAPUser(ctx, true, s.gluonIDProvider, addrID); err != nil { + return fmt.Errorf("failed to remove user from server: %w", err) + } + + connector.StateClose() + + delete(s.connectors, addrID) + + return nil +} diff --git a/internal/services/imapservice/service_label_events.go b/internal/services/imapservice/service_label_events.go new file mode 100644 index 00000000..b9ed2889 --- /dev/null +++ b/internal/services/imapservice/service_label_events.go @@ -0,0 +1,179 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/proton-bridge/v3/internal/events" + "github.com/ProtonMail/proton-bridge/v3/internal/logging" + "github.com/bradenaw/juniper/xslices" + "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" +) + +func (s *Service) onLabelEvent(ctx context.Context, events []proton.LabelEvent) error { + s.log.Debug("handling label event") + + for _, event := range events { + switch event.Action { + case proton.EventCreate: + updates := onLabelCreated(ctx, s, event) + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + + case proton.EventUpdateFlags, proton.EventUpdate: + updates, err := onLabelUpdated(ctx, s, event) + if err != nil { + return fmt.Errorf("failed to handle update label event: %w", err) + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + + case proton.EventDelete: + updates := onLabelDeleted(ctx, s, event) + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return fmt.Errorf("failed to handle delete label event in gluon: %w", err) + } + } + } + + return nil +} + +func onLabelCreated(ctx context.Context, s *Service, event proton.LabelEvent) []imap.Update { + updates := make([]imap.Update, 0, len(s.connectors)) + + s.log.WithFields(logrus.Fields{ + "labelID": event.ID, + "name": logging.Sensitive(event.Label.Name), + }).Info("Handling label created event") + + wr := s.labels.Write() + defer wr.Close() + + wr.SetLabel(event.Label.ID, event.Label) + + for _, updateCh := range maps.Values(s.connectors) { + update := newMailboxCreatedUpdate(imap.MailboxID(event.ID), GetMailboxName(event.Label)) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + + s.eventPublisher.PublishEvent(ctx, events.UserLabelCreated{ + UserID: s.identityState.UserID(), + LabelID: event.Label.ID, + Name: event.Label.Name, + }) + + return updates +} + +func onLabelUpdated(ctx context.Context, s *Service, event proton.LabelEvent) ([]imap.Update, error) { + var updates []imap.Update + + s.log.WithFields(logrus.Fields{ + "labelID": event.ID, + "name": logging.Sensitive(event.Label.Name), + }).Info("Handling label updated event") + + stack := []proton.Label{event.Label} + + wr := s.labels.Write() + defer wr.Close() + + for len(stack) > 0 { + label := stack[0] + stack = stack[1:] + + // Only update the label if it exists; we don't want to create it as a client may have just deleted it. + if _, ok := wr.GetLabel(label.ID); ok { + wr.SetLabel(label.ID, event.Label) + } + + // API doesn't notify us that the path has changed. We need to fetch it again. + apiLabel, err := s.client.GetLabel(ctx, label.ID, label.Type) + if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status == http.StatusUnprocessableEntity { + s.log.WithError(apiErr).Warn("Failed to get label: label does not exist") + continue + } else if err != nil { + return nil, fmt.Errorf("failed to get label %q: %w", label.ID, err) + } + + // Update the label in the map. + wr.SetLabel(apiLabel.ID, apiLabel) + + // Notify the IMAP clients. + for _, updateCh := range maps.Values(s.connectors) { + update := imap.NewMailboxUpdated( + imap.MailboxID(apiLabel.ID), + GetMailboxName(apiLabel), + ) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + + s.eventPublisher.PublishEvent(ctx, events.UserLabelUpdated{ + UserID: s.identityState.UserID(), + LabelID: apiLabel.ID, + Name: apiLabel.Name, + }) + + children := xslices.Filter(wr.GetLabels(), func(other proton.Label) bool { + return other.ParentID == label.ID + }) + + stack = append(stack, children...) + } + + return updates, nil +} + +func onLabelDeleted(ctx context.Context, s *Service, event proton.LabelEvent) []imap.Update { + updates := make([]imap.Update, 0, len(s.connectors)) + + s.log.WithField("labelID", event.ID).Info("Handling label deleted event") + + for _, updateCh := range maps.Values(s.connectors) { + update := imap.NewMailboxDeleted(imap.MailboxID(event.ID)) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + + wr := s.labels.Write() + wr.Close() + + wr.Delete(event.ID) + + s.eventPublisher.PublishEvent(ctx, events.UserLabelDeleted{ + UserID: s.identityState.UserID(), + LabelID: event.ID, + }) + + return updates +} diff --git a/internal/services/imapservice/service_message_events.go b/internal/services/imapservice/service_message_events.go new file mode 100644 index 00000000..9257ffe5 --- /dev/null +++ b/internal/services/imapservice/service_message_events.go @@ -0,0 +1,346 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "bytes" + "context" + "errors" + "fmt" + "net/http" + + "github.com/ProtonMail/gluon" + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/gluon/reporter" + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/gopenpgp/v2/crypto" + "github.com/ProtonMail/proton-bridge/v3/internal" + "github.com/ProtonMail/proton-bridge/v3/internal/logging" + "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" + "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" +) + +func (s *Service) onMessageEvent(ctx context.Context, events []proton.MessageEvent) error { + s.log.Debug("handling message event") + + for _, event := range events { + ctx = logging.WithLogrusField(ctx, "messageID", event.ID) + + switch event.Action { + case proton.EventCreate: + updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message) + if err != nil { + reportError(s.reporter, s.log, "Failed to apply create message event", err) + return fmt.Errorf("failed to handle create message event: %w", err) + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + + case proton.EventUpdate, proton.EventUpdateFlags: + // Draft update means to completely remove old message and upload the new data again, but we should + // only do this if the event is of type EventUpdate otherwise label switch operations will not work. + if (event.Message.IsDraft() || (event.Message.Flags&proton.MessageFlagSent != 0)) && event.Action == proton.EventUpdate { + updates, err := onMessageUpdateDraftOrSent( + logging.WithLogrusField(ctx, "action", "update draft or sent message"), + s, + event, + ) + if err != nil { + reportError(s.reporter, s.log, "Failed to apply update draft message event", err) + return fmt.Errorf("failed to handle update draft event: %w", err) + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + + continue + } + + // GODT-2028 - Use better events here. It should be possible to have 3 separate events that refrain to + // whether the flags, labels or read only data (header+body) has been changed. This requires fixing proton + // first so that it correctly reports those cases. + // Issue regular update to handle mailboxes and flag changes. + updates, err := onMessageUpdate( + logging.WithLogrusField(ctx, "action", "update message"), + s, + event.Message, + ) + if err != nil { + reportError(s.reporter, s.log, "Failed to apply update message event", err) + return fmt.Errorf("failed to handle update message event: %w", err) + } + + // If the update fails on the gluon side because it doesn't exist, we try to create the message instead. + if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) { + s.log.WithError(err).Error("Failed to handle update message event in gluon, will try creating it") + + updates, err := onMessageCreated(ctx, s, event.Message) + if err != nil { + return fmt.Errorf("failed to handle update message event as create: %w", err) + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + } else if err != nil { + return err + } + + case proton.EventDelete: + updates := onMessageDeleted( + logging.WithLogrusField(ctx, "action", "delete message"), + s, + event, + ) + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return fmt.Errorf("failed to handle delete message event in gluon: %w", err) + } + } + } + + return nil +} + +func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMetadata) ([]imap.Update, error) { + s.log.WithFields(logrus.Fields{ + "messageID": message.ID, + "subject": logging.Sensitive(message.Subject), + }).Info("Handling message created event") + + full, err := s.client.GetFullMessage(ctx, message.ID, usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator()) + if err != nil { + // If the message is not found, it means that it has been deleted before we could fetch it. + if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status == http.StatusUnprocessableEntity { + s.log.WithField("messageID", message.ID).Warn("Cannot create new message: full message is missing on API") + return nil, nil + } + + return nil, fmt.Errorf("failed to get full message: %w", err) + } + + var update imap.Update + + apiLabels := s.labels.GetLabelMap() + + if err := s.identityState.WithAddrKR(message.AddressID, func(_, addrKR *crypto.KeyRing) error { + res := buildRFC822(apiLabels, full, addrKR, new(bytes.Buffer)) + + if res.err != nil { + s.log.WithError(err).Error("Failed to build RFC822 message") + + if err := s.syncStateProvider.AddFailedMessageID(message.ID); err != nil { + s.log.WithError(err).Error("Failed to add failed message ID to vault") + } + + reportErrorAndMessageID(s.reporter, s.log, "Failed to build message (event create)", res.err, res.messageID) + + return nil + } + + if err := s.syncStateProvider.RemFailedMessageID(message.ID); err != nil { + s.log.WithError(err).Error("Failed to remove failed message ID from vault") + } + + update = imap.NewMessagesCreated(false, res.update) + didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update) + if err != nil { + return err + } + + if !didPublish { + update = nil + } + + return nil + }); err != nil { + return nil, err + } + + if update == nil { + return nil, nil + } + + return []imap.Update{update}, nil +} + +func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.MessageEvent) ([]imap.Update, error) { + s.log.WithFields(logrus.Fields{ + "messageID": event.ID, + "subject": logging.Sensitive(event.Message.Subject), + "isDraft": event.Message.IsDraft(), + }).Info("Handling draft or sent updated event") + + full, err := s.client.GetFullMessage(ctx, event.Message.ID, usertypes.NewProtonAPIScheduler(s.panicHandler), proton.NewDefaultAttachmentAllocator()) + if err != nil { + // If the message is not found, it means that it has been deleted before we could fetch it. + if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status == http.StatusUnprocessableEntity { + s.log.WithField("messageID", event.Message.ID).Warn("Cannot update message: full message is missing on API") + return nil, nil + } + + return nil, fmt.Errorf("failed to get full draft: %w", err) + } + + var update imap.Update + + apiLabels := s.labels.GetLabelMap() + + if err := s.identityState.WithAddrKR(event.Message.AddressID, func(_, addrKR *crypto.KeyRing) error { + res := buildRFC822(apiLabels, full, addrKR, new(bytes.Buffer)) + + if res.err != nil { + logrus.WithError(err).Error("Failed to build RFC822 message") + + if err := s.syncStateProvider.AddFailedMessageID(event.ID); err != nil { + s.log.WithError(err).Error("Failed to add failed message ID to vault") + } + + reportErrorAndMessageID(s.reporter, s.log, "Failed to build draft message (event update)", res.err, res.messageID) + + return nil + } + + if err := s.syncStateProvider.RemFailedMessageID(event.ID); err != nil { + s.log.WithError(err).Error("Failed to remove failed message ID from vault") + } + + update = imap.NewMessageUpdated( + res.update.Message, + res.update.Literal, + res.update.MailboxIDs, + res.update.ParsedMessage, + true, // Is the message doesn't exist, silently create it. + ) + + didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update) + if err != nil { + return err + } + + if !didPublish { + update = nil + } + + return nil + }); err != nil { + return nil, err + } + + if update == nil { + return nil, nil + } + + return []imap.Update{update}, nil +} + +func onMessageUpdate(ctx context.Context, s *Service, message proton.MessageMetadata) ([]imap.Update, error) { + s.log.WithFields(logrus.Fields{ + "messageID": message.ID, + "subject": logging.Sensitive(message.Subject), + }).Info("Handling message updated event") + + flags := BuildFlagSetFromMessageMetadata(message) + + update := imap.NewMessageMailboxesUpdated( + imap.MessageID(message.ID), + usertypes.MapTo[string, imap.MailboxID](wantLabels(s.labels.GetLabelMap(), message.LabelIDs)), + flags, + ) + + didPublish, err := safePublishMessageUpdate(ctx, s, message.AddressID, update) + if err != nil { + return nil, err + } + + if !didPublish { + return nil, nil + } + + return []imap.Update{update}, nil +} + +func onMessageDeleted(ctx context.Context, s *Service, event proton.MessageEvent) []imap.Update { + s.log.WithField("messageID", event.ID).Info("Handling message deleted event") + + updates := make([]imap.Update, 0, len(s.connectors)) + + for _, updateCh := range maps.Values(s.connectors) { + update := imap.NewMessagesDeleted(imap.MessageID(event.ID)) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + + return updates +} + +func reportError(r reporter.Reporter, entry *logrus.Entry, title string, err error) { + reportErrorNoContextCancel(r, entry, title, err, reporter.Context{}) +} + +func reportErrorAndMessageID(r reporter.Reporter, entry *logrus.Entry, title string, err error, messgeID string) { + reportErrorNoContextCancel(r, entry, title, err, reporter.Context{"messageID": messgeID}) +} + +func reportErrorNoContextCancel(r reporter.Reporter, entry *logrus.Entry, title string, err error, reportContext reporter.Context) { + if !errors.Is(err, context.Canceled) { + reportContext["error"] = err + reportContext["error_type"] = internal.ErrCauseType(err) + if rerr := r.ReportMessageWithContext(title, reportContext); rerr != nil { + entry.WithError(err).WithField("title", title).Error("Failed to report message") + } + } +} + +// safePublishMessageUpdate handles the rare case where the address' update channel may have been deleted in the same +// event. This rare case can take place if in the same event fetch request there is an update for delete address and +// create/update message. +// If the user is in combined mode, we simply push the update to the primary address. If the user is in split mode +// we do not publish the update as the address no longer exists. +func safePublishMessageUpdate(ctx context.Context, s *Service, addressID string, update imap.Update) (bool, error) { + v, ok := s.connectors[addressID] + if !ok { + if s.addressMode == usertypes.AddressModeCombined { + primAddr, err := s.identityState.GetPrimaryAddress() + if err != nil { + return false, fmt.Errorf("failed to get primary address: %w", err) + } + primaryCh, ok := s.connectors[primAddr.ID] + if !ok { + return false, fmt.Errorf("primary address channel is not available") + } + + primaryCh.publishUpdate(ctx, update) + + return true, nil + } + + logrus.Warnf("Update channel not found for address %v, it may have been already deleted", addressID) + _ = s.reporter.ReportMessage("Message Update channel does not exist") + + return false, nil + } + + v.publishUpdate(ctx, update) + + return true, nil +} diff --git a/internal/services/imapservice/service_sync.go b/internal/services/imapservice/service_sync.go new file mode 100644 index 00000000..8c21952b --- /dev/null +++ b/internal/services/imapservice/service_sync.go @@ -0,0 +1,126 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + + "github.com/ProtonMail/gluon/async" + "github.com/ProtonMail/gluon/imap" +) + +type syncUpdate struct { + addrID string + update imap.Update +} + +type syncUpdater struct { + ch chan syncUpdate +} + +type syncUpdatePublisher struct { + addrID string + updater *syncUpdater +} + +func (s *syncUpdatePublisher) publishUpdate(ctx context.Context, update imap.Update) { + select { + case <-ctx.Done(): + update.Done(fmt.Errorf("not applied: %w", ctx.Err())) + return + + case s.updater.ch <- syncUpdate{addrID: s.addrID, update: update}: + } +} + +func newSyncUpdater() *syncUpdater { + return &syncUpdater{ch: make(chan syncUpdate)} +} + +func (s *syncUpdater) createPublisher(addrID string) *syncUpdatePublisher { + return &syncUpdatePublisher{updater: s, addrID: addrID} +} + +func (s *syncUpdater) Close() { + close(s.ch) +} + +type syncHandler struct { + group *async.Group + updater *syncUpdater + syncFinishedCh chan error +} + +func newSyncHandler(ctx context.Context, handler async.PanicHandler) *syncHandler { + return &syncHandler{ + group: async.NewGroup(ctx, handler), + updater: newSyncUpdater(), + syncFinishedCh: make(chan error, 2), + } +} + +func (s *syncHandler) Close() { + s.group.CancelAndWait() + close(s.syncFinishedCh) +} + +func (s *syncHandler) CancelAndWait() { + s.group.CancelAndWait() +} + +func (s *syncHandler) Cancel() { + s.group.Cancel() +} + +func (s *syncHandler) OnSyncFinishedCH() <-chan error { + return s.syncFinishedCh +} + +func (s *syncHandler) launch(service *Service) { + service.eventProvider.Pause() + + labels := service.labels.GetLabelMap() + + updaters := make(map[string]updatePublisher, len(service.connectors)) + + for _, c := range service.connectors { + updaters[c.addrID] = s.updater.createPublisher(c.addrID) + } + + state := &syncJob{ + client: service.client, + userID: service.identityState.UserID(), + labels: labels, + updaters: updaters, + addressMode: service.addressMode, + syncState: service.syncStateProvider, + eventPublisher: service.eventPublisher, + log: service.log, + // We make a copy of the identity state to avoid holding on to locks for a very long time. + identityState: service.identityState.Clone(), + panicHandler: service.panicHandler, + reporter: service.reporter, + maxSyncMemory: service.maxSyncMemory, + } + + s.group.Once(func(ctx context.Context) { + err := state.run(ctx) + s.syncFinishedCh <- err + }) +} diff --git a/internal/services/imapservice/shared_identity.go b/internal/services/imapservice/shared_identity.go new file mode 100644 index 00000000..1d02a2c7 --- /dev/null +++ b/internal/services/imapservice/shared_identity.go @@ -0,0 +1,110 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "sync" + + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/gopenpgp/v2/crypto" + "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity" + "golang.org/x/exp/slices" +) + +type sharedIdentity interface { + UserID() string + GetAddress(id string) (proton.Address, bool) + GetPrimaryAddress() (proton.Address, error) + GetAddresses() []proton.Address + WithAddrKR(addrID string, fn func(userKR, addrKR *crypto.KeyRing) error) error + CheckAuth(email string, password []byte, telemetry Telemetry) (string, error) +} + +type rwIdentity struct { + lock sync.RWMutex + identity *useridentity.State + bridgePassProvider useridentity.BridgePassProvider + keyPassProvider useridentity.KeyPassProvider +} + +func (r *rwIdentity) GetPrimaryAddress() (proton.Address, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.identity.GetPrimaryAddr() +} + +func (r *rwIdentity) GetAddresses() []proton.Address { + r.lock.RLock() + defer r.lock.RUnlock() + + return slices.Clone(r.identity.AddressesSorted) +} + +func (r *rwIdentity) Clone() *useridentity.State { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.identity.Clone() +} + +func newRWIdentity(identity *useridentity.State, + bridgePassProvider useridentity.BridgePassProvider, + keyPassProvider useridentity.KeyPassProvider, +) *rwIdentity { + return &rwIdentity{ + identity: identity, + bridgePassProvider: bridgePassProvider, + keyPassProvider: keyPassProvider, + } +} + +func (r *rwIdentity) UserID() string { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.identity.User.ID +} + +func (r *rwIdentity) GetAddress(id string) (proton.Address, bool) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.identity.GetAddrByID(id) +} + +func (r *rwIdentity) WithAddrKR(addrID string, fn func(userKR *crypto.KeyRing, addrKR *crypto.KeyRing) error) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.identity.WithAddrKR(addrID, r.keyPassProvider.KeyPass(), fn) +} + +func (r *rwIdentity) CheckAuth(email string, password []byte, telemetry Telemetry) (string, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.identity.CheckAuth(email, password, r.bridgePassProvider, telemetry) +} + +func (r *rwIdentity) Write(f func(identity *useridentity.State) error) error { + r.lock.Lock() + defer r.lock.Unlock() + + return f(r.identity) +} diff --git a/internal/services/imapservice/shared_labels.go b/internal/services/imapservice/shared_labels.go new file mode 100644 index 00000000..99982a8a --- /dev/null +++ b/internal/services/imapservice/shared_labels.go @@ -0,0 +1,132 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "sync" + + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" + "golang.org/x/exp/maps" +) + +type labelMap = map[string]proton.Label + +// sharedLabels holds the shared state of all the available API labels which can safely be shared among +// all IMAP states. It's written this way to prevent issues with invalid use of the locks. +type sharedLabels interface { + Read() labelsRead + Write() labelsWrite +} + +type labelsRead interface { + Close() + GetLabel(id string) (proton.Label, bool) + GetLabels() []proton.Label +} + +type labelsWrite interface { + labelsRead + SetLabel(id string, label proton.Label) + Delete(id string) +} + +type rwLabels struct { + lock sync.RWMutex + labels labelMap +} + +func (r *rwLabels) Read() labelsRead { + r.lock.RLock() + return &rwLabelsRead{rw: r} +} + +func (r *rwLabels) Write() labelsWrite { + r.lock.Lock() + return &rwLabelsWrite{rw: r} +} + +func (r *rwLabels) getLabelUnsafe(id string) (proton.Label, bool) { + v, ok := r.labels[id] + + return v, ok +} + +func (r *rwLabels) getLabelsUnsafe() []proton.Label { + return maps.Values(r.labels) +} + +func (r *rwLabels) SetLabels(labels []proton.Label) { + r.lock.Lock() + defer r.lock.Unlock() + + r.labels = usertypes.GroupBy(labels, func(label proton.Label) string { return label.ID }) +} + +func (r *rwLabels) GetLabelMap() labelMap { + r.lock.Lock() + defer r.lock.Unlock() + + return maps.Clone(r.labels) +} + +func newRWLabels() *rwLabels { + return &rwLabels{ + labels: make(labelMap), + } +} + +type rwLabelsRead struct { + rw *rwLabels +} + +func (r rwLabelsRead) Close() { + r.rw.lock.RUnlock() +} + +func (r rwLabelsRead) GetLabel(id string) (proton.Label, bool) { + return r.rw.getLabelUnsafe(id) +} + +func (r rwLabelsRead) GetLabels() []proton.Label { + return r.rw.getLabelsUnsafe() +} + +type rwLabelsWrite struct { + rw *rwLabels +} + +func (r rwLabelsWrite) Close() { + r.rw.lock.Unlock() +} + +func (r rwLabelsWrite) GetLabel(id string) (proton.Label, bool) { + return r.rw.getLabelUnsafe(id) +} + +func (r rwLabelsWrite) GetLabels() []proton.Label { + return r.rw.getLabelsUnsafe() +} + +func (r rwLabelsWrite) SetLabel(id string, label proton.Label) { + r.rw.labels[id] = label +} + +func (r rwLabelsWrite) Delete(id string) { + delete(r.rw.labels, id) +} diff --git a/internal/services/imapservice/sync.go b/internal/services/imapservice/sync.go new file mode 100644 index 00000000..b45a1e49 --- /dev/null +++ b/internal/services/imapservice/sync.go @@ -0,0 +1,202 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + "time" + + "github.com/ProtonMail/gluon/async" + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/gluon/reporter" + "github.com/ProtonMail/proton-bridge/v3/internal/events" + "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity" + "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" + "github.com/bradenaw/juniper/xslices" + "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" +) + +type updatePublisher interface { + publishUpdate(ctx context.Context, update imap.Update) +} + +type syncJob struct { + client APIClient + userID string + labels labelMap + updaters map[string]updatePublisher + addressMode usertypes.AddressMode + syncState SyncStateProvider + eventPublisher events.EventPublisher + log *logrus.Entry + identityState *useridentity.State + panicHandler async.PanicHandler + reporter reporter.Reporter + maxSyncMemory uint64 +} + +const SyncRetryCoolDown = 20 * time.Second + +func (s *syncJob) run(ctx context.Context) error { + s.log.Info("Sync triggered") + s.eventPublisher.PublishEvent(ctx, events.SyncStarted{UserID: s.userID}) + + if s.syncState.GetSyncStatus().IsComplete() { + s.log.Info("Sync already complete, only system labels will be updated") + + if err := s.syncSystemLabels(ctx); err != nil { + s.log.WithError(err).Error("Failed to sync system labels") + s.eventPublisher.PublishEvent(ctx, events.SyncFailed{ + UserID: s.userID, + Error: err, + }) + return err + } + s.eventPublisher.PublishEvent(ctx, events.SyncFinished{UserID: s.userID}) + return nil + } + + for { + if err := ctx.Err(); err != nil { + s.log.WithError(err).Error("Sync aborted") + return fmt.Errorf("sync aborted: %w", ctx.Err()) + } else if err := s.doSync(ctx); err != nil { + s.log.WithError(err).Error("Failed to sync, will retry later") + sleepCtx(ctx, SyncRetryCoolDown) + } else { + break + } + } + + return nil +} + +func (s *syncJob) syncSystemLabels(ctx context.Context) error { + var updates []imap.Update + + for _, label := range s.labels { + if !WantLabel(label) { + continue + } + + for _, connector := range s.updaters { + update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name) + connector.publishUpdate(ctx, update) + updates = append(updates, update) + } + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return fmt.Errorf("could not sync system labels: %w", err) + } + + return nil +} + +func (s *syncJob) doSync(ctx context.Context) error { + start := time.Now() + + s.log.WithField("start", start).Info("Beginning user sync") + + s.eventPublisher.PublishEvent(ctx, events.SyncStarted{ + UserID: s.userID, + }) + + if err := s.sync(ctx); err != nil { + s.log.WithError(err).Warn("Failed to sync user") + + s.eventPublisher.PublishEvent(ctx, events.SyncFailed{ + UserID: s.userID, + Error: err, + }) + + return fmt.Errorf("failed to sync: %w", err) + } + + s.log.WithField("duration", time.Since(start)).Info("Finished user sync") + + s.eventPublisher.PublishEvent(ctx, events.SyncFinished{ + UserID: s.userID, + }) + + return nil +} + +func (s *syncJob) sync(ctx context.Context) error { + syncStatus := s.syncState.GetSyncStatus() + + if !syncStatus.HasLabels { + s.log.Info("Syncing labels") + + if err := syncLabels(ctx, s.labels, maps.Values(s.updaters)...); err != nil { + return fmt.Errorf("failed to sync labels: %w", err) + } + + if err := s.syncState.SetHasLabels(true); err != nil { + return fmt.Errorf("failed to set has labels: %w", err) + } + + s.log.Info("Synced labels") + } + + if !syncStatus.HasMessages { + s.log.Info("Syncing messages") + + // Determine which messages to sync. + messageIDs, err := s.client.GetMessageIDs(ctx, "") + if err != nil { + return fmt.Errorf("failed to get message IDs to sync: %w", err) + } + + s.log.Debugf("User has the following failed synced message ids: %v", syncStatus.FailedMessageIDs) + + // Remove any messages that have already failed to sync. + messageIDs = xslices.Filter(messageIDs, func(messageID string) bool { + return !slices.Contains(syncStatus.FailedMessageIDs, messageID) + }) + + // Reverse the order of the message IDs so that the newest messages are synced first. + xslices.Reverse(messageIDs) + + // If we have a message ID that we've already synced, then we can skip all messages before it. + if idx := xslices.Index(messageIDs, syncStatus.LastMessageID); idx >= 0 { + messageIDs = messageIDs[idx+1:] + } + + // Sync the messages. + if err := s.syncMessages( + ctx, + messageIDs, + ); err != nil { + return fmt.Errorf("failed to sync messages: %w", err) + } + + if err := s.syncState.SetHasMessages(true); err != nil { + return fmt.Errorf("failed to set has messages: %w", err) + } + + s.log.Info("Synced messages") + } else { + s.log.Info("Messages are already synced, skipping") + } + + return nil +} diff --git a/internal/services/imapservice/sync_attachment_downloader.go b/internal/services/imapservice/sync_attachment_downloader.go new file mode 100644 index 00000000..5acf6631 --- /dev/null +++ b/internal/services/imapservice/sync_attachment_downloader.go @@ -0,0 +1,115 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "bytes" + "context" + "fmt" + + "github.com/ProtonMail/gluon/async" + "github.com/ProtonMail/gluon/logging" + "github.com/ProtonMail/go-proton-api" +) + +type attachmentResult struct { + attachment []byte + err error +} + +type attachmentJob struct { + id string + size int64 + result chan attachmentResult +} + +type attachmentDownloader struct { + workerCh chan attachmentJob + cancel context.CancelFunc +} + +func attachmentWorker(ctx context.Context, client APIClient, work <-chan attachmentJob) { + for { + select { + case <-ctx.Done(): + return + case job, ok := <-work: + if !ok { + return + } + var b bytes.Buffer + b.Grow(int(job.size)) + err := client.GetAttachmentInto(ctx, job.id, &b) + select { + case <-ctx.Done(): + close(job.result) + return + case job.result <- attachmentResult{attachment: b.Bytes(), err: err}: + close(job.result) + } + } + } +} + +func (s *syncJob) newAttachmentDownloader(ctx context.Context, client APIClient, workerCount int) *attachmentDownloader { + workerCh := make(chan attachmentJob, (workerCount+2)*workerCount) + ctx, cancel := context.WithCancel(ctx) + for i := 0; i < workerCount; i++ { + workerCh = make(chan attachmentJob) + async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { attachmentWorker(ctx, client, workerCh) }, logging.Labels{ + "sync": fmt.Sprintf("att-downloader %v", i), + }) + } + + return &attachmentDownloader{ + workerCh: workerCh, + cancel: cancel, + } +} + +func (a *attachmentDownloader) getAttachments(ctx context.Context, attachments []proton.Attachment) ([][]byte, error) { + resultChs := make([]chan attachmentResult, len(attachments)) + for i, id := range attachments { + resultChs[i] = make(chan attachmentResult, 1) + select { + case a.workerCh <- attachmentJob{id: id.ID, result: resultChs[i], size: id.Size}: + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + result := make([][]byte, len(attachments)) + var err error + for i := 0; i < len(attachments); i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case r := <-resultChs[i]: + if r.err != nil { + err = fmt.Errorf("failed to get attachment %v: %w", attachments[i], r.err) + } + result[i] = r.attachment + } + } + + return result, err +} + +func (a *attachmentDownloader) close() { + a.cancel() +} diff --git a/internal/services/imapservice/sync_build.go b/internal/services/imapservice/sync_build.go new file mode 100644 index 00000000..1af8e407 --- /dev/null +++ b/internal/services/imapservice/sync_build.go @@ -0,0 +1,174 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "bytes" + "html/template" + "time" + + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/gopenpgp/v2/crypto" + "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" + "github.com/ProtonMail/proton-bridge/v3/pkg/algo" + "github.com/ProtonMail/proton-bridge/v3/pkg/message" + "github.com/bradenaw/juniper/xslices" +) + +type buildRes struct { + messageID string + addressID string + update *imap.MessageCreated + err error +} + +func defaultMessageJobOpts() message.JobOptions { + return message.JobOptions{ + IgnoreDecryptionErrors: true, // Whether to ignore decryption errors and create a "custom message" instead. + SanitizeDate: true, // Whether to replace all dates before 1970 with RFC822's birthdate. + AddInternalID: true, // Whether to include MessageID as X-Pm-Internal-Id. + AddExternalID: true, // Whether to include ExternalID as X-Pm-External-Id. + AddMessageDate: true, // Whether to include message time as X-Pm-Date. + AddMessageIDReference: true, // Whether to include the MessageID in References. + } +} + +func buildRFC822(apiLabels map[string]proton.Label, full proton.FullMessage, addrKR *crypto.KeyRing, buffer *bytes.Buffer) *buildRes { + var ( + update *imap.MessageCreated + err error + ) + + buffer.Grow(full.Size) + + if buildErr := message.BuildRFC822Into(addrKR, full.Message, full.AttData, defaultMessageJobOpts(), buffer); buildErr != nil { + update = newMessageCreatedFailedUpdate(apiLabels, full.MessageMetadata, buildErr) + err = buildErr + } else if created, parseErr := newMessageCreatedUpdate(apiLabels, full.MessageMetadata, buffer.Bytes()); parseErr != nil { + update = newMessageCreatedFailedUpdate(apiLabels, full.MessageMetadata, parseErr) + err = parseErr + } else { + update = created + } + + return &buildRes{ + messageID: full.ID, + addressID: full.AddressID, + update: update, + err: err, + } +} + +func newMessageCreatedUpdate( + apiLabels map[string]proton.Label, + message proton.MessageMetadata, + literal []byte, +) (*imap.MessageCreated, error) { + parsedMessage, err := imap.NewParsedMessage(literal) + if err != nil { + return nil, err + } + + return &imap.MessageCreated{ + Message: toIMAPMessage(message), + Literal: literal, + MailboxIDs: usertypes.MapTo[string, imap.MailboxID](wantLabels(apiLabels, message.LabelIDs)), + ParsedMessage: parsedMessage, + }, nil +} + +func newMessageCreatedFailedUpdate( + apiLabels map[string]proton.Label, + message proton.MessageMetadata, + err error, +) *imap.MessageCreated { + literal := newFailedMessageLiteral(message.ID, time.Unix(message.Time, 0), message.Subject, err) + + parsedMessage, err := imap.NewParsedMessage(literal) + if err != nil { + panic(err) + } + + return &imap.MessageCreated{ + Message: toIMAPMessage(message), + MailboxIDs: usertypes.MapTo[string, imap.MailboxID](wantLabels(apiLabels, message.LabelIDs)), + Literal: literal, + ParsedMessage: parsedMessage, + } +} + +func newFailedMessageLiteral( + messageID string, + date time.Time, + subject string, + syncErr error, +) []byte { + var buf bytes.Buffer + + if tmpl, err := template.New("header").Parse(failedMessageHeaderTemplate); err != nil { + panic(err) + } else if b, err := tmplExec(tmpl, map[string]any{ + "Date": date.In(time.UTC).Format(time.RFC822), + }); err != nil { + panic(err) + } else if _, err := buf.Write(b); err != nil { + panic(err) + } + + if tmpl, err := template.New("body").Parse(failedMessageBodyTemplate); err != nil { + panic(err) + } else if b, err := tmplExec(tmpl, map[string]any{ + "MessageID": messageID, + "Subject": subject, + "Error": syncErr.Error(), + }); err != nil { + panic(err) + } else if _, err := buf.Write(lineWrap(algo.B64Encode(b))); err != nil { + panic(err) + } + + return buf.Bytes() +} + +func tmplExec(template *template.Template, data any) ([]byte, error) { + var buf bytes.Buffer + + if err := template.Execute(&buf, data); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func lineWrap(b []byte) []byte { + return bytes.Join(xslices.Chunk(b, 76), []byte{'\r', '\n'}) +} + +const failedMessageHeaderTemplate = `Date: {{.Date}} +Subject: Message failed to build +Content-Type: text/plain +Content-Transfer-Encoding: base64 + +` + +const failedMessageBodyTemplate = `Failed to build message: +Subject: {{.Subject}} +Error: {{.Error}} +MessageID: {{.MessageID}} +` diff --git a/internal/services/imapservice/sync_build_test.go b/internal/services/imapservice/sync_build_test.go new file mode 100644 index 00000000..a47e11e8 --- /dev/null +++ b/internal/services/imapservice/sync_build_test.go @@ -0,0 +1,80 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "errors" + "testing" + "time" + + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/gluon/rfc822" + "github.com/ProtonMail/go-proton-api" + "github.com/bradenaw/juniper/xslices" + "github.com/stretchr/testify/require" +) + +func TestNewFailedMessageLiteral(t *testing.T) { + literal := newFailedMessageLiteral("abcd-efgh", time.Unix(123456789, 0), "subject", errors.New("oops")) + + header, err := rfc822.Parse(literal).ParseHeader() + require.NoError(t, err) + require.Equal(t, "Message failed to build", header.Get("Subject")) + require.Equal(t, "29 Nov 73 21:33 UTC", header.Get("Date")) + require.Equal(t, "text/plain", header.Get("Content-Type")) + require.Equal(t, "base64", header.Get("Content-Transfer-Encoding")) + + b, err := rfc822.Parse(literal).DecodedBody() + require.NoError(t, err) + require.Equal(t, string(b), "Failed to build message: \nSubject: subject\nError: oops\nMessageID: abcd-efgh\n") + + parsed, err := imap.NewParsedMessage(literal) + require.NoError(t, err) + require.Equal(t, `("29 Nov 73 21:33 UTC" "Message failed to build" NIL NIL NIL NIL NIL NIL NIL NIL)`, parsed.Envelope) + require.Equal(t, `("text" "plain" () NIL NIL "base64" 114 2)`, parsed.Body) + require.Equal(t, `("text" "plain" () NIL NIL "base64" 114 2 NIL NIL NIL NIL)`, parsed.Structure) +} + +func TestSyncChunkSyncBuilderBatch(t *testing.T) { + // GODT-2424 - Some messages were not fully built due to a bug in the chunking if the total memory used by the + // message would be higher than the maximum we allowed. + const totalMessageCount = 100 + + msg := proton.FullMessage{ + Message: proton.Message{ + Attachments: []proton.Attachment{ + { + Size: int64(8 * Megabyte), + }, + }, + }, + AttData: nil, + } + + messages := xslices.Repeat(msg, totalMessageCount) + + chunks := chunkSyncBuilderBatch(messages, 16*Megabyte) + + var totalMessagesInChunks int + + for _, v := range chunks { + totalMessagesInChunks += len(v) + } + + require.Equal(t, totalMessagesInChunks, totalMessageCount) +} diff --git a/internal/services/imapservice/sync_labels.go b/internal/services/imapservice/sync_labels.go new file mode 100644 index 00000000..d6a9468d --- /dev/null +++ b/internal/services/imapservice/sync_labels.go @@ -0,0 +1,76 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/go-proton-api" +) + +// nolint:exhaustive +func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updatePublishers ...updatePublisher) error { + var updates []imap.Update + + // Create placeholder Folders/Labels mailboxes with the \Noselect attribute. + for _, prefix := range []string{folderPrefix, labelPrefix} { + for _, updateCh := range updatePublishers { + update := newPlaceHolderMailboxCreatedUpdate(prefix) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + } + + // Sync the user's labels. + for labelID, label := range apiLabels { + if !WantLabel(label) { + continue + } + + switch label.Type { + case proton.LabelTypeSystem: + for _, updateCh := range updatePublishers { + update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + + case proton.LabelTypeFolder, proton.LabelTypeLabel: + for _, updateCh := range updatePublishers { + update := newMailboxCreatedUpdate(imap.MailboxID(labelID), GetMailboxName(label)) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + + default: + return fmt.Errorf("unknown label type: %d", label.Type) + } + } + + // Wait for all label updates to be applied. + for _, update := range updates { + err, ok := update.WaitContext(ctx) + if ok && err != nil { + return fmt.Errorf("failed to apply label create update in gluon %v: %w", update.String(), err) + } + } + + return nil +} diff --git a/internal/services/imapservice/sync_messages.go b/internal/services/imapservice/sync_messages.go new file mode 100644 index 00000000..40af32d3 --- /dev/null +++ b/internal/services/imapservice/sync_messages.go @@ -0,0 +1,523 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "bytes" + "context" + "fmt" + "os" + "runtime" + "time" + + "github.com/ProtonMail/gluon/async" + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/gluon/logging" + "github.com/ProtonMail/gluon/reporter" + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/gopenpgp/v2/crypto" + "github.com/bradenaw/juniper/parallel" + "github.com/bradenaw/juniper/xslices" + "github.com/pbnjay/memory" + "github.com/sirupsen/logrus" +) + +func (s *syncJob) syncMessages(ctx context.Context, messageIDs []string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Track the amount of time to process all the messages. + syncStartTime := time.Now() + defer func() { logrus.WithField("duration", time.Since(syncStartTime)).Info("Message sync completed") }() + + s.log.WithFields(logrus.Fields{ + "messages": len(messageIDs), + "numCPU": runtime.NumCPU(), + }).Info("Starting message sync") + + // Create the flushers, one per update channel. + + // Create a reporter to report sync progress updates. + syncReporter := newSyncReporter(s.userID, s.eventPublisher, len(messageIDs), time.Second) + defer syncReporter.done(ctx) + + // Expected mem usage for this whole process should be the sum of MaxMessageBuildingMem and MaxDownloadRequestMem + // times x due to pipeline and all additional memory used by network requests and compression+io. + + totalMemory := memory.TotalMemory() + + syncLimits := newSyncLimits(s.maxSyncMemory) + + if syncLimits.MaxSyncMemory >= totalMemory/2 { + logrus.Warnf("Requested max sync memory of %v MB is greater than half of system memory (%v MB), forcing to half of system memory", + toMB(syncLimits.MaxSyncMemory), toMB(totalMemory/2)) + syncLimits.MaxSyncMemory = totalMemory / 2 + } + + if syncLimits.MaxSyncMemory < 800*Megabyte { + logrus.Warnf("Requested max sync memory of %v MB, but minimum recommended is 800 MB, forcing max syncMemory to 800MB", toMB(syncLimits.MaxSyncMemory)) + syncLimits.MaxSyncMemory = 800 * Megabyte + } + + logrus.Debugf("Total System Memory: %v", toMB(totalMemory)) + + // Linter says it's not used. This is a lie. + var syncMaxDownloadRequestMem uint64 + + // Linter says it's not used. This is a lie. + var syncMaxMessageBuildingMem uint64 + + // If less than 2GB available try and limit max memory to 512 MB + switch { + case syncLimits.MaxSyncMemory < 2*Gigabyte: + if syncLimits.MaxSyncMemory < 800*Megabyte { + logrus.Warnf("System has less than 800MB of memory, you may experience issues sycing large mailboxes") + } + syncMaxDownloadRequestMem = syncLimits.MinDownloadRequestMem + syncMaxMessageBuildingMem = syncLimits.MinMessageBuildingMem + case syncLimits.MaxSyncMemory == 2*Gigabyte: + // Increasing the max download capacity has very little effect on sync speed. We could increase the download + // memory but the user would see less sync notifications. A smaller value here leads to more frequent + // updates. Additionally, most of sync time is spent in the message building. + syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem + // Currently limited so that if a user has multiple accounts active it also doesn't cause excessive memory usage. + syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem + default: + // Divide by 8 as download stage and build stage will use aprox. 4x the specified memory. + remainingMemory := (syncLimits.MaxSyncMemory - 2*Gigabyte) / 8 + syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem + remainingMemory + syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem + remainingMemory + } + + logrus.Debugf("Max memory usage for sync Download=%vMB Building=%vMB Predicted Max Total=%vMB", + toMB(syncMaxDownloadRequestMem), + toMB(syncMaxMessageBuildingMem), + toMB((syncMaxMessageBuildingMem*4)+(syncMaxDownloadRequestMem*4)), + ) + + downloadCh := startMetadataDownloader(ctx, s, messageIDs, syncMaxDownloadRequestMem) + buildCh, errorCh := startMessageDownloader(ctx, s, syncLimits, downloadCh) + flushCh := startMessageBuilder(ctx, s, buildCh, syncMaxMessageBuildingMem) + flushUpdateCh := startMessageFlusher(ctx, s, flushCh) + + for flushUpdate := range flushUpdateCh { + if flushUpdate.err != nil { + return flushUpdate.err + } + + if err := s.syncState.SetLastMessageID(flushUpdate.messageID); err != nil { + return fmt.Errorf("failed to set last synced message ID: %w", err) + } + + syncReporter.add(ctx, flushUpdate.batchLen) + } + + return <-errorCh +} + +const Kilobyte = uint64(1024) +const Megabyte = 1024 * Kilobyte +const Gigabyte = 1024 * Megabyte + +func toMB(v uint64) float64 { + return float64(v) / float64(Megabyte) +} + +type syncLimits struct { + MaxDownloadRequestMem uint64 + MinDownloadRequestMem uint64 + MaxMessageBuildingMem uint64 + MinMessageBuildingMem uint64 + MaxSyncMemory uint64 + MaxParallelDownloads int +} + +func newSyncLimits(maxSyncMemory uint64) syncLimits { + limits := syncLimits{ + // There's no point in using more than 128MB of download data per stage, after that we reach a point of diminishing + // returns as we can't keep the pipeline fed fast enough. + MaxDownloadRequestMem: 128 * Megabyte, + + // Any lower than this and we may fail to download messages. + MinDownloadRequestMem: 40 * Megabyte, + + // This value can be increased to your hearts content. The more system memory the user has, the more messages + // we can build in parallel. + MaxMessageBuildingMem: 128 * Megabyte, + MinMessageBuildingMem: 64 * Megabyte, + + // Maximum recommend value for parallel downloads by the API team. + MaxParallelDownloads: 20, + + MaxSyncMemory: maxSyncMemory, + } + + if _, ok := os.LookupEnv("BRIDGE_SYNC_FORCE_MINIMUM_SPEC"); ok { + logrus.Warn("Sync specs forced to minimum") + limits.MaxDownloadRequestMem = 50 * Megabyte + limits.MaxMessageBuildingMem = 80 * Megabyte + limits.MaxParallelDownloads = 2 + limits.MaxSyncMemory = 800 * Megabyte + } + + return limits +} + +func chunkSyncBuilderBatch(batch []proton.FullMessage, maxMemory uint64) [][]proton.FullMessage { + var expectedMemUsage uint64 + var chunks [][]proton.FullMessage + var lastIndex int + var index int + + for _, v := range batch { + var dataSize uint64 + for _, a := range v.Attachments { + dataSize += uint64(a.Size) + } + + // 2x increase for attachment due to extra memory needed for decrypting and writing + // in memory buffer. + dataSize *= 2 + dataSize += uint64(len(v.Body)) + + nextMemSize := expectedMemUsage + dataSize + if nextMemSize >= maxMemory { + chunks = append(chunks, batch[lastIndex:index]) + lastIndex = index + expectedMemUsage = dataSize + } else { + expectedMemUsage = nextMemSize + } + + index++ + } + + if lastIndex < len(batch) { + chunks = append(chunks, batch[lastIndex:]) + } + + return chunks +} + +type flushUpdate struct { + messageID string + err error + batchLen int +} + +type downloadRequest struct { + ids []string + expectedSize uint64 + err error +} + +type downloadedMessageBatch struct { + batch []proton.FullMessage +} + +type builtMessageBatch struct { + batch []*buildRes +} + +func startMetadataDownloader(ctx context.Context, s *syncJob, messageIDs []string, syncMaxDownloadRequestMem uint64) <-chan downloadRequest { + downloadCh := make(chan downloadRequest) + // Go routine in charge of downloading message metadata + async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { + defer close(downloadCh) + const MetadataDataPageSize = 150 + + var downloadReq downloadRequest + downloadReq.ids = make([]string, 0, MetadataDataPageSize) + + metadataChunks := xslices.Chunk(messageIDs, MetadataDataPageSize) + for i, metadataChunk := range metadataChunks { + logrus.Debugf("Metadata Request (%v of %v), previous: %v", i, len(metadataChunks), len(downloadReq.ids)) + metadata, err := s.client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk}) + if err != nil { + logrus.WithError(err).Errorf("Failed to download message metadata for chunk %v", i) + downloadReq.err = err + select { + case downloadCh <- downloadReq: + case <-ctx.Done(): + return + } + return + } + + if ctx.Err() != nil { + return + } + + // Build look up table so that messages are processed in the same order. + metadataMap := make(map[string]int, len(metadata)) + for i, v := range metadata { + metadataMap[v.ID] = i + } + + for i, id := range metadataChunk { + m := &metadata[metadataMap[id]] + nextSize := downloadReq.expectedSize + uint64(m.Size) + if nextSize >= syncMaxDownloadRequestMem || len(downloadReq.ids) >= 256 { + logrus.Debugf("Download Request Sent at %v of %v", i, len(metadata)) + select { + case downloadCh <- downloadReq: + + case <-ctx.Done(): + return + } + downloadReq.expectedSize = 0 + downloadReq.ids = make([]string, 0, MetadataDataPageSize) + nextSize = uint64(m.Size) + } + downloadReq.ids = append(downloadReq.ids, id) + downloadReq.expectedSize = nextSize + } + } + + if len(downloadReq.ids) != 0 { + logrus.Debugf("Sending remaining download request") + select { + case downloadCh <- downloadReq: + + case <-ctx.Done(): + return + } + } + }, logging.Labels{"sync-stage": "meta-data"}) + + return downloadCh +} + +func startMessageDownloader(ctx context.Context, s *syncJob, syncLimits syncLimits, downloadCh <-chan downloadRequest) (<-chan downloadedMessageBatch, <-chan error) { + buildCh := make(chan downloadedMessageBatch) + errorCh := make(chan error, syncLimits.MaxParallelDownloads*4) + + // Goroutine in charge of downloading and building messages in maxBatchSize batches. + async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { + defer close(buildCh) + defer close(errorCh) + defer func() { + logrus.Debugf("sync downloader exit") + }() + + attachmentDownloader := s.newAttachmentDownloader(ctx, s.client, syncLimits.MaxParallelDownloads) + defer attachmentDownloader.close() + + for request := range downloadCh { + logrus.Debugf("Download request: %v MB:%v", len(request.ids), toMB(request.expectedSize)) + if request.err != nil { + errorCh <- request.err + return + } + + if ctx.Err() != nil { + errorCh <- ctx.Err() + return + } + + result, err := parallel.MapContext(ctx, syncLimits.MaxParallelDownloads, request.ids, func(ctx context.Context, id string) (proton.FullMessage, error) { + defer async.HandlePanic(s.panicHandler) + + var result proton.FullMessage + + msg, err := s.client.GetMessage(ctx, id) + if err != nil { + logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message") + return proton.FullMessage{}, err + } + + attachments, err := attachmentDownloader.getAttachments(ctx, msg.Attachments) + if err != nil { + logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message attachments") + return proton.FullMessage{}, err + } + + result.Message = msg + result.AttData = attachments + + return result, nil + }) + if err != nil { + errorCh <- err + return + } + + select { + case buildCh <- downloadedMessageBatch{ + batch: result, + }: + + case <-ctx.Done(): + return + } + } + }, logging.Labels{"sync-stage": "download"}) + + return buildCh, errorCh +} + +func startMessageBuilder(ctx context.Context, s *syncJob, buildCh <-chan downloadedMessageBatch, syncMaxMessageBuildingMem uint64) <-chan builtMessageBatch { + flushCh := make(chan builtMessageBatch) + + // Goroutine which builds messages after they have been downloaded + async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { + defer close(flushCh) + defer func() { + logrus.Debugf("sync builder exit") + }() + + if err := s.identityState.WithAddrKRs(nil, func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error { + maxMessagesInParallel := runtime.NumCPU() + + for buildBatch := range buildCh { + if ctx.Err() != nil { + return ctx.Err() + } + + chunks := chunkSyncBuilderBatch(buildBatch.batch, syncMaxMessageBuildingMem) + + for index, chunk := range chunks { + logrus.Debugf("Build request: %v of %v count=%v", index, len(chunks), len(chunk)) + + result, err := parallel.MapContext(ctx, maxMessagesInParallel, chunk, func(ctx context.Context, msg proton.FullMessage) (*buildRes, error) { + defer async.HandlePanic(s.panicHandler) + + kr, ok := addrKRs[msg.AddressID] + if !ok { + logrus.Errorf("Address '%v' on message '%v' does not have an unlocked kerying", msg.AddressID, msg.ID) + return &buildRes{ + messageID: msg.ID, + addressID: msg.AddressID, + err: fmt.Errorf("address does not have an unlocked keyring"), + }, nil + } + + res := buildRFC822(s.labels, msg, kr, new(bytes.Buffer)) + if res.err != nil { + s.log.WithError(res.err).WithField("msgID", msg.ID).Error("Failed to build message (syn)") + } + + return res, nil + }) + if err != nil { + return err + } + + select { + case flushCh <- builtMessageBatch{result}: + + case <-ctx.Done(): + return nil + } + } + } + + return nil + }); err != nil { + s.log.WithError(err).Error("Sync message builder exited with error") + } + }, logging.Labels{"sync-stage": "builder"}) + + return flushCh +} + +func startMessageFlusher(ctx context.Context, s *syncJob, messageBatchCH <-chan builtMessageBatch) <-chan flushUpdate { + flushUpdateCh := make(chan flushUpdate) + + // Goroutine which converts the messages into updates and builds a waitable structure for progress tracking. + async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { + defer close(flushUpdateCh) + defer func() { + logrus.Debugf("sync flush exit") + }() + + type updateTargetInfo struct { + queueIndex int + ch updatePublisher + } + + pendingUpdates := make([][]*imap.MessageCreated, len(s.updaters)) + addressToIndex := make(map[string]updateTargetInfo) + + { + i := 0 + for addrID, updateCh := range s.updaters { + addressToIndex[addrID] = updateTargetInfo{ + ch: updateCh, + queueIndex: i, + } + i++ + } + } + + for downloadBatch := range messageBatchCH { + logrus.Debugf("Flush batch: %v", len(downloadBatch.batch)) + for _, res := range downloadBatch.batch { + if res.err != nil { + if err := s.syncState.AddFailedMessageID(res.messageID); err != nil { + logrus.WithError(err).Error("Failed to add failed message ID") + } + + if err := s.reporter.ReportMessageWithContext("Failed to build message (sync)", reporter.Context{ + "messageID": res.messageID, + "error": res.err, + }); err != nil { + s.log.WithError(err).Error("Failed to report message build error") + } + + // We could sync a placeholder message here, but for now we skip it entirely. + continue + } + + if err := s.syncState.RemFailedMessageID(res.messageID); err != nil { + logrus.WithError(err).Error("Failed to remove failed message ID") + } + + targetInfo := addressToIndex[res.addressID] + pendingUpdates[targetInfo.queueIndex] = append(pendingUpdates[targetInfo.queueIndex], res.update) + } + + for _, info := range addressToIndex { + up := imap.NewMessagesCreated(true, pendingUpdates[info.queueIndex]...) + info.ch.publishUpdate(ctx, up) + + err, ok := up.WaitContext(ctx) + if ok && err != nil { + flushUpdateCh <- flushUpdate{ + err: fmt.Errorf("failed to apply sync update to gluon %v: %w", up.String(), err), + } + return + } + + pendingUpdates[info.queueIndex] = pendingUpdates[info.queueIndex][:0] + } + + select { + case flushUpdateCh <- flushUpdate{ + messageID: downloadBatch.batch[0].messageID, + err: nil, + batchLen: len(downloadBatch.batch), + }: + case <-ctx.Done(): + return + } + } + }, logging.Labels{"sync-stage": "flush"}) + + return flushUpdateCh +} diff --git a/internal/services/imapservice/sync_reporter.go b/internal/services/imapservice/sync_reporter.go new file mode 100644 index 00000000..7adda6ca --- /dev/null +++ b/internal/services/imapservice/sync_reporter.go @@ -0,0 +1,72 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "time" + + "github.com/ProtonMail/proton-bridge/v3/internal/events" +) + +type syncReporter struct { + userID string + eventPublisher events.EventPublisher + + start time.Time + total int + count int + + last time.Time + freq time.Duration +} + +func newSyncReporter(userID string, eventsPublisher events.EventPublisher, total int, freq time.Duration) *syncReporter { + return &syncReporter{ + userID: userID, + eventPublisher: eventsPublisher, + + start: time.Now(), + total: total, + freq: freq, + } +} + +func (rep *syncReporter) add(ctx context.Context, delta int) { + rep.count += delta + + if time.Since(rep.last) > rep.freq { + rep.eventPublisher.PublishEvent(ctx, events.SyncProgress{ + UserID: rep.userID, + Progress: float64(rep.count) / float64(rep.total), + Elapsed: time.Since(rep.start), + Remaining: time.Since(rep.start) * time.Duration(rep.total-(rep.count+1)) / time.Duration(rep.count+1), + }) + + rep.last = time.Now() + } +} + +func (rep *syncReporter) done(ctx context.Context) { + rep.eventPublisher.PublishEvent(ctx, events.SyncProgress{ + UserID: rep.userID, + Progress: 1, + Elapsed: time.Since(rep.start), + Remaining: 0, + }) +}