diff --git a/go.sum b/go.sum index f51e5f58..e0af3f30 100644 --- a/go.sum +++ b/go.sum @@ -266,7 +266,6 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/internal/bridge/bridge.go b/internal/bridge/bridge.go index ba0a884a..30ce9633 100644 --- a/internal/bridge/bridge.go +++ b/internal/bridge/bridge.go @@ -41,6 +41,7 @@ import ( "github.com/ProtonMail/proton-bridge/v3/internal/safe" "github.com/ProtonMail/proton-bridge/v3/internal/sentry" "github.com/ProtonMail/proton-bridge/v3/internal/services/imapsmtpserver" + "github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice" "github.com/ProtonMail/proton-bridge/v3/internal/telemetry" "github.com/ProtonMail/proton-bridge/v3/internal/user" "github.com/ProtonMail/proton-bridge/v3/internal/vault" @@ -127,6 +128,7 @@ type Bridge struct { goHeartbeat func() serverManager *imapsmtpserver.Service + syncService *syncservice.Service } // New creates a new bridge. @@ -268,7 +270,8 @@ func newBridge( firstStart: firstStart, lastVersion: lastVersion, - tasks: tasks, + tasks: tasks, + syncService: syncservice.NewService(reporter, panicHandler), } bridge.serverManager = imapsmtpserver.NewService(context.Background(), @@ -285,6 +288,8 @@ func newBridge( return nil, err } + bridge.syncService.Run(bridge.tasks) + return bridge, nil } diff --git a/internal/bridge/types.go b/internal/bridge/types.go index dab3c816..5373257d 100644 --- a/internal/bridge/types.go +++ b/internal/bridge/types.go @@ -32,6 +32,7 @@ type Locator interface { GetLicenseFilePath() string GetDependencyLicensesLink() string Clear(...string) error + ProvideIMAPSyncConfigPath() (string, error) } type ProxyController interface { diff --git a/internal/bridge/user.go b/internal/bridge/user.go index 011b5cd4..594ccb7e 100644 --- a/internal/bridge/user.go +++ b/internal/bridge/user.go @@ -30,6 +30,7 @@ import ( "github.com/ProtonMail/proton-bridge/v3/internal/events" "github.com/ProtonMail/proton-bridge/v3/internal/logging" "github.com/ProtonMail/proton-bridge/v3/internal/safe" + "github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice" "github.com/ProtonMail/proton-bridge/v3/internal/try" "github.com/ProtonMail/proton-bridge/v3/internal/user" "github.com/ProtonMail/proton-bridge/v3/internal/vault" @@ -243,6 +244,11 @@ func (bridge *Bridge) LogoutUser(ctx context.Context, userID string) error { func (bridge *Bridge) DeleteUser(ctx context.Context, userID string) error { logrus.WithField("userID", userID).Info("Deleting user") + syncConfigDir, err := bridge.locator.ProvideIMAPSyncConfigPath() + if err != nil { + return fmt.Errorf("failed to get sync config path") + } + return safe.LockRet(func() error { if !bridge.vault.HasUser(userID) { return ErrNoSuchUser @@ -252,6 +258,10 @@ func (bridge *Bridge) DeleteUser(ctx context.Context, userID string) error { bridge.logoutUser(ctx, user, true, true, !bridge.GetTelemetryDisabled()) } + if err := imapservice.DeleteSyncState(syncConfigDir, userID); err != nil { + return fmt.Errorf("failed to delete use sync config") + } + if err := bridge.vault.DeleteUser(userID); err != nil { logrus.WithError(err).Error("Failed to delete vault user") } @@ -510,6 +520,11 @@ func (bridge *Bridge) addUserWithVault( return fmt.Errorf("failed to get Statistics directory: %w", err) } + syncSettingsPath, err := bridge.locator.ProvideIMAPSyncConfigPath() + if err != nil { + return fmt.Errorf("failed to get IMAP sync config path: %w", err) + } + user, err := user.New( ctx, vault, @@ -524,6 +539,8 @@ func (bridge *Bridge) addUserWithVault( bridge.serverManager, bridge.serverManager, &bridgeEventSubscription{b: bridge}, + bridge.syncService, + syncSettingsPath, ) if err != nil { return fmt.Errorf("failed to create user: %w", err) diff --git a/internal/locations/locations.go b/internal/locations/locations.go index 39ca4099..c22e5b6a 100644 --- a/internal/locations/locations.go +++ b/internal/locations/locations.go @@ -198,6 +198,14 @@ func (l *Locations) ProvideStatsPath() (string, error) { return l.getStatsPath(), nil } +func (l *Locations) ProvideIMAPSyncConfigPath() (string, error) { + if err := os.MkdirAll(l.getIMAPSyncConfigPath(), 0o700); err != nil { + return "", err + } + + return l.getIMAPSyncConfigPath(), nil +} + func (l *Locations) getGluonCachePath() string { return filepath.Join(l.userData, "gluon") } @@ -214,6 +222,10 @@ func (l *Locations) getSettingsPath() string { return l.userConfig } +func (l *Locations) getIMAPSyncConfigPath() string { + return filepath.Join(l.userConfig, "imap-sync") +} + func (l *Locations) getLogsPath() string { return filepath.Join(l.userData, "logs") } diff --git a/internal/services/imapservice/api_client.go b/internal/services/imapservice/api_client.go index 54b3de99..c6bf2630 100644 --- a/internal/services/imapservice/api_client.go +++ b/internal/services/imapservice/api_client.go @@ -35,6 +35,7 @@ type APIClient interface { UnlabelMessages(ctx context.Context, messageIDs []string, labelID string) error GetLabels(ctx context.Context, labelTypes ...proton.LabelType) ([]proton.Label, error) + GetGroupedMessageCount(ctx context.Context) ([]proton.MessageGroupCount, error) GetMessage(ctx context.Context, messageID string) (proton.Message, error) GetMessageMetadataPage(ctx context.Context, page, pageSize int, filter proton.MessageFilter) ([]proton.MessageMetadata, error) GetAllMessageIDs(ctx context.Context, afterID string) ([]string, error) diff --git a/internal/services/imapservice/helpers.go b/internal/services/imapservice/helpers.go index af9e0b3e..83d4a021 100644 --- a/internal/services/imapservice/helpers.go +++ b/internal/services/imapservice/helpers.go @@ -18,7 +18,6 @@ package imapservice import ( - "context" "fmt" "net/mail" "time" @@ -181,11 +180,3 @@ func wantLabels(apiLabels map[string]proton.Label, labelIDs []string) []string { return WantLabel(apiLabel) }) } - -// sleepCtx sleeps for the given duration, or until the context is canceled. -func sleepCtx(ctx context.Context, d time.Duration) { - select { - case <-ctx.Done(): - case <-time.After(d): - } -} diff --git a/internal/services/imapservice/server_manager.go b/internal/services/imapservice/server_manager.go index 2085f778..e25f31b9 100644 --- a/internal/services/imapservice/server_manager.go +++ b/internal/services/imapservice/server_manager.go @@ -21,6 +21,7 @@ import ( "context" "github.com/ProtonMail/gluon/connector" + "github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice" ) type IMAPServerManager interface { @@ -29,7 +30,7 @@ type IMAPServerManager interface { connector connector.Connector, addrID string, idProvider GluonIDProvider, - syncStateProvider SyncStateProvider, + syncStateProvider syncservice.StateProvider, ) error RemoveIMAPUser(ctx context.Context, deleteData bool, provider GluonIDProvider, addrID ...string) error @@ -42,7 +43,7 @@ func (n NullIMAPServerManager) AddIMAPUser( _ connector.Connector, _ string, _ GluonIDProvider, - _ SyncStateProvider, + _ syncservice.StateProvider, ) error { return nil } diff --git a/internal/services/imapservice/service.go b/internal/services/imapservice/service.go index 168434a2..af40fcd2 100644 --- a/internal/services/imapservice/service.go +++ b/internal/services/imapservice/service.go @@ -19,7 +19,10 @@ package imapservice import ( "context" + "errors" "fmt" + "path/filepath" + "time" "github.com/ProtonMail/gluon/async" "github.com/ProtonMail/gluon/reporter" @@ -28,12 +31,13 @@ import ( "github.com/ProtonMail/proton-bridge/v3/internal/events" "github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks" "github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder" + "github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice" "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents" "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity" "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" - "github.com/ProtonMail/proton-bridge/v3/internal/vault" "github.com/ProtonMail/proton-bridge/v3/pkg/cpc" "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" ) type EventProvider interface { @@ -55,16 +59,6 @@ type GluonIDProvider interface { GluonKey() []byte } -type SyncStateProvider interface { - AddFailedMessageID(messageID string) error - RemFailedMessageID(messageID string) error - GetSyncStatus() vault.SyncStatus - ClearSyncStatus() error - SetHasLabels(bool) error - SetHasMessages(bool) error - SetLastMessageID(messageID string) error -} - type Service struct { log *logrus.Entry cpc *cpc.CPC @@ -76,11 +70,10 @@ type Service struct { subscription *userevents.EventChanneledSubscriber - gluonIDProvider GluonIDProvider - syncStateProvider SyncStateProvider - eventProvider EventProvider - serverManager IMAPServerManager - eventPublisher events.EventPublisher + gluonIDProvider GluonIDProvider + eventProvider EventProvider + serverManager IMAPServerManager + eventPublisher events.EventPublisher telemetry Telemetry panicHandler async.PanicHandler @@ -92,14 +85,20 @@ type Service struct { connectors map[string]*Connector maxSyncMemory uint64 showAllMail bool - syncHandler *syncHandler + + syncHandler *syncservice.Handler + syncUpdateApplier *SyncUpdateApplier + syncMessageBuilder *SyncMessageBuilder + syncStateProvider *SyncState + syncReporter *syncReporter + + syncConfigPath string } func NewService( client APIClient, identityState *useridentity.State, gluonIDProvider GluonIDProvider, - syncStateProvider SyncStateProvider, eventProvider EventProvider, serverManager IMAPServerManager, eventPublisher events.EventPublisher, @@ -111,27 +110,34 @@ func NewService( reporter reporter.Reporter, addressMode usertypes.AddressMode, subscription events.Subscription, + syncConfigDir string, maxSyncMemory uint64, showAllMail bool, ) *Service { subscriberName := fmt.Sprintf("imap-%v", identityState.User.ID) + log := logrus.WithFields(logrus.Fields{ + "user": identityState.User.ID, + "service": "imap", + }) + rwIdentity := newRWIdentity(identityState, bridgePassProvider, keyPassProvider) + + syncUpdateApplier := NewSyncUpdateApplier() + syncMessageBuilder := NewSyncMessageBuilder(rwIdentity) + syncReporter := newSyncReporter(identityState.User.ID, eventPublisher, time.Second) + return &Service{ - cpc: cpc.NewCPC(), - log: logrus.WithFields(logrus.Fields{ - "user": identityState.User.ID, - "service": "imap", - }), + cpc: cpc.NewCPC(), client: client, - identityState: newRWIdentity(identityState, bridgePassProvider, keyPassProvider), + log: log, + identityState: rwIdentity, labels: newRWLabels(), addressMode: addressMode, - gluonIDProvider: gluonIDProvider, - serverManager: serverManager, - syncStateProvider: syncStateProvider, - eventProvider: eventProvider, - eventPublisher: eventPublisher, + gluonIDProvider: gluonIDProvider, + serverManager: serverManager, + eventProvider: eventProvider, + eventPublisher: eventPublisher, subscription: userevents.NewEventSubscriber(subscriberName), @@ -146,10 +152,31 @@ func NewService( eventWatcher: subscription.Add(events.IMAPServerCreated{}), eventSubscription: subscription, showAllMail: showAllMail, + + syncUpdateApplier: syncUpdateApplier, + syncMessageBuilder: syncMessageBuilder, + syncReporter: syncReporter, + syncConfigPath: getSyncConfigPath(syncConfigDir, identityState.User.ID), } } -func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) error { +func (s *Service) Start( + ctx context.Context, + group *orderedtasks.OrderedCancelGroup, + syncRegulator syncservice.Regulator, + +) error { + { + syncStateProvider, err := NewSyncState(s.syncConfigPath) + if err != nil { + return fmt.Errorf("failed to load sync state: %w", err) + } + + s.syncStateProvider = syncStateProvider + } + + s.syncHandler = syncservice.NewHandler(syncRegulator, s.client, s.identityState.UserID(), s.syncStateProvider, s.log, s.panicHandler) + // Get user labels apiLabels, err := s.client.GetLabels(ctx, proton.LabelTypeSystem, proton.LabelTypeFolder, proton.LabelTypeLabel) if err != nil { @@ -227,6 +254,10 @@ func (s *Service) GetLabels(ctx context.Context) (map[string]proton.Label, error return cpc.SendTyped[map[string]proton.Label](ctx, s.cpc, &getLabelsReq{}) } +func (s *Service) GetSyncFailedMessageIDs(ctx context.Context) ([]string, error) { + return cpc.SendTyped[[]string](ctx, s.cpc, &getSyncFailedMessagesReq{}) +} + func (s *Service) Close() { for _, c := range s.connectors { c.StateClose() @@ -251,7 +282,7 @@ func (s *Service) HandleRefreshEvent(ctx context.Context, _ proton.RefreshFlag) return err } - if err := s.syncStateProvider.ClearSyncStatus(); err != nil { + if err := s.syncStateProvider.ClearSyncStatus(ctx); err != nil { return fmt.Errorf("failed to clear sync status:%w", err) } @@ -259,7 +290,7 @@ func (s *Service) HandleRefreshEvent(ctx context.Context, _ proton.RefreshFlag) return err } - s.syncHandler.launch(s) + s.startSyncing() return nil } @@ -279,11 +310,9 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo defer s.cpc.Close() defer s.eventSubscription.Remove(s.eventWatcher) - - s.syncHandler = newSyncHandler(ctx, s.panicHandler) defer s.syncHandler.Close() - s.syncHandler.launch(s) + s.startSyncing() eventHandler := userevents.EventHandler{ UserHandler: s, @@ -307,7 +336,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo } switch r := req.Value().(type) { case *setAddressModeReq: - err := s.setAddressMode(ctx, s.syncHandler, r.mode) + err := s.setAddressMode(ctx, r.mode) req.Reply(ctx, nil, err) case *resyncReq: @@ -325,7 +354,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo s.log.Info("Resuming sync") // Cancel previous run, if any, just in case. s.syncHandler.CancelAndWait() - s.syncHandler.launch(s) + s.startSyncing() req.Reply(ctx, nil, nil) case *getLabelsReq: labels := s.labels.GetLabelMap() @@ -347,6 +376,15 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo req.Reply(ctx, nil, nil) s.setShowAllMail(r.v) + case *getSyncFailedMessagesReq: + status, err := s.syncStateProvider.GetSyncStatus(ctx) + if err != nil { + req.Reply(ctx, nil, fmt.Errorf("failed to get sync status: %w", err)) + continue + } + + req.Reply(ctx, maps.Keys(status.FailedMessages), nil) + default: s.log.Error("Received unknown request") } @@ -366,11 +404,19 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo s.eventProvider.ResumeIMAP() } - case update, ok := <-s.syncHandler.updater.ch: + case request, ok := <-s.syncUpdateApplier.requestCh: if !ok { continue } - s.onSyncUpdate(ctx, update) + + updates, err := request(ctx, s.addressMode, s.connectors) + + if err := s.syncUpdateApplier.reply(ctx, updates, err); err != nil { + if !errors.Is(err, context.Canceled) { + s.log.WithError(err).Error("unexpected error during sync update reply") + } + return + } case e, ok := <-s.subscription.OnEventCh(): if !ok { @@ -449,17 +495,6 @@ func (s *Service) rebuildConnectors() error { return nil } -func (s *Service) onSyncUpdate(ctx context.Context, syncUpdate syncUpdate) { - c, ok := s.connectors[syncUpdate.addrID] - if !ok { - s.log.Warningf("Received syncUpdate for unknown addr (%v), connector may have been removed", syncUpdate.addrID) - syncUpdate.update.Done(fmt.Errorf("undeliverable")) - return - } - - c.publishUpdate(ctx, syncUpdate.update) -} - func (s *Service) addConnectorsToServer(ctx context.Context, connectors map[string]*Connector) error { addedConnectors := make([]string, 0, len(connectors)) for _, c := range connectors { @@ -490,7 +525,7 @@ func (s *Service) removeConnectorsFromServer(ctx context.Context, connectors map return nil } -func (s *Service) setAddressMode(ctx context.Context, handler *syncHandler, mode usertypes.AddressMode) error { +func (s *Service) setAddressMode(ctx context.Context, mode usertypes.AddressMode) error { if s.addressMode == mode { return nil } @@ -502,13 +537,13 @@ func (s *Service) setAddressMode(ctx context.Context, handler *syncHandler, mode s.log.Info("Setting Combined Address Mode") } - handler.CancelAndWait() + s.syncHandler.CancelAndWait() if err := s.removeConnectorsFromServer(ctx, s.connectors, true); err != nil { return err } - if err := s.syncStateProvider.ClearSyncStatus(); err != nil { + if err := s.syncStateProvider.ClearSyncStatus(ctx); err != nil { return fmt.Errorf("failed to clear sync status:%w", err) } @@ -520,7 +555,7 @@ func (s *Service) setAddressMode(ctx context.Context, handler *syncHandler, mode return err } - handler.launch(s) + s.startSyncing() return nil } @@ -537,6 +572,10 @@ func (s *Service) setShowAllMail(v bool) { } } +func (s *Service) startSyncing() { + s.syncHandler.Execute(s.syncReporter, s.labels.GetLabelMap(), s.syncUpdateApplier, s.syncMessageBuilder, syncservice.DefaultRetryCoolDown) +} + type resyncReq struct{} type cancelSyncReq struct{} @@ -556,3 +595,9 @@ type showAllMailReq struct{ v bool } type setAddressModeReq struct { mode usertypes.AddressMode } + +type getSyncFailedMessagesReq struct{} + +func getSyncConfigPath(path string, userID string) string { + return filepath.Join(path, fmt.Sprintf("sync-%v", userID)) +} diff --git a/internal/services/imapservice/service_address_events.go b/internal/services/imapservice/service_address_events.go index 5f673243..2c5aed9c 100644 --- a/internal/services/imapservice/service_address_events.go +++ b/internal/services/imapservice/service_address_events.go @@ -136,7 +136,12 @@ func addNewAddressSplitMode(ctx context.Context, s *Service, addrID string) erro s.connectors[connector.addrID] = connector - if err := syncLabels(ctx, s.labels.GetLabelMap(), connector); err != nil { + updates, err := syncLabels(ctx, s.labels.GetLabelMap(), []*Connector{connector}) + if err != nil { + return fmt.Errorf("failed to create labels updates for new address: %w", err) + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { return fmt.Errorf("failed to sync labels for new address: %w", err) } diff --git a/internal/services/imapservice/service_message_events.go b/internal/services/imapservice/service_message_events.go index 32391462..dfda43a4 100644 --- a/internal/services/imapservice/service_message_events.go +++ b/internal/services/imapservice/service_message_events.go @@ -148,7 +148,7 @@ func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMet if res.err != nil { s.log.WithError(err).Error("Failed to build RFC822 message") - if err := s.syncStateProvider.AddFailedMessageID(message.ID); err != nil { + if err := s.syncStateProvider.AddFailedMessageID(ctx, message.ID); err != nil { s.log.WithError(err).Error("Failed to add failed message ID to vault") } @@ -157,7 +157,7 @@ func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMet return nil } - if err := s.syncStateProvider.RemFailedMessageID(message.ID); err != nil { + if err := s.syncStateProvider.RemFailedMessageID(ctx, message.ID); err != nil { s.log.WithError(err).Error("Failed to remove failed message ID from vault") } @@ -211,7 +211,7 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me if res.err != nil { logrus.WithError(err).Error("Failed to build RFC822 message") - if err := s.syncStateProvider.AddFailedMessageID(event.ID); err != nil { + if err := s.syncStateProvider.AddFailedMessageID(ctx, event.ID); err != nil { s.log.WithError(err).Error("Failed to add failed message ID to vault") } @@ -220,7 +220,7 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me return nil } - if err := s.syncStateProvider.RemFailedMessageID(event.ID); err != nil { + if err := s.syncStateProvider.RemFailedMessageID(ctx, event.ID); err != nil { s.log.WithError(err).Error("Failed to remove failed message ID from vault") } diff --git a/internal/services/imapservice/service_sync.go b/internal/services/imapservice/service_sync.go deleted file mode 100644 index ba682825..00000000 --- a/internal/services/imapservice/service_sync.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright (c) 2023 Proton AG -// -// This file is part of Proton Mail Bridge. -// -// Proton Mail Bridge is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Proton Mail Bridge is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with Proton Mail Bridge. If not, see . - -package imapservice - -import ( - "context" - "fmt" - - "github.com/ProtonMail/gluon/async" - "github.com/ProtonMail/gluon/imap" -) - -type syncUpdate struct { - addrID string - update imap.Update -} - -type syncUpdater struct { - ch chan syncUpdate -} - -type syncUpdatePublisher struct { - addrID string - updater *syncUpdater -} - -func (s *syncUpdatePublisher) publishUpdate(ctx context.Context, update imap.Update) { - select { - case <-ctx.Done(): - update.Done(fmt.Errorf("not applied: %w", ctx.Err())) - return - - case s.updater.ch <- syncUpdate{addrID: s.addrID, update: update}: - } -} - -func newSyncUpdater() *syncUpdater { - return &syncUpdater{ch: make(chan syncUpdate)} -} - -func (s *syncUpdater) createPublisher(addrID string) *syncUpdatePublisher { - return &syncUpdatePublisher{updater: s, addrID: addrID} -} - -func (s *syncUpdater) Close() { - close(s.ch) -} - -type syncHandler struct { - group *async.Group - updater *syncUpdater - syncFinishedCh chan error -} - -func newSyncHandler(ctx context.Context, handler async.PanicHandler) *syncHandler { - return &syncHandler{ - group: async.NewGroup(ctx, handler), - updater: newSyncUpdater(), - syncFinishedCh: make(chan error, 2), - } -} - -func (s *syncHandler) Close() { - s.group.CancelAndWait() - close(s.syncFinishedCh) -} - -func (s *syncHandler) CancelAndWait() { - s.group.CancelAndWait() -} - -func (s *syncHandler) Cancel() { - s.group.Cancel() -} - -func (s *syncHandler) OnSyncFinishedCH() <-chan error { - return s.syncFinishedCh -} - -func (s *syncHandler) launch(service *Service) { - service.eventProvider.PauseIMAP() - - labels := service.labels.GetLabelMap() - - updaters := make(map[string]updatePublisher, len(service.connectors)) - - for _, c := range service.connectors { - updaters[c.addrID] = s.updater.createPublisher(c.addrID) - } - - state := &syncJob{ - client: service.client, - userID: service.identityState.UserID(), - labels: labels, - updaters: updaters, - addressMode: service.addressMode, - syncState: service.syncStateProvider, - eventPublisher: service.eventPublisher, - log: service.log, - // We make a copy of the identity state to avoid holding on to locks for a very long time. - identityState: service.identityState.Clone(), - panicHandler: service.panicHandler, - reporter: service.reporter, - maxSyncMemory: service.maxSyncMemory, - keyProvider: service.identityState.keyPassProvider, - } - - s.group.Once(func(ctx context.Context) { - err := state.run(ctx) - s.syncFinishedCh <- err - }) -} diff --git a/internal/services/imapservice/shared_identity.go b/internal/services/imapservice/shared_identity.go index 1d02a2c7..c53575fb 100644 --- a/internal/services/imapservice/shared_identity.go +++ b/internal/services/imapservice/shared_identity.go @@ -95,6 +95,13 @@ func (r *rwIdentity) WithAddrKR(addrID string, fn func(userKR *crypto.KeyRing, a return r.identity.WithAddrKR(addrID, r.keyPassProvider.KeyPass(), fn) } +func (r *rwIdentity) WithAddrKRs(fn func(*crypto.KeyRing, map[string]*crypto.KeyRing) error) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.identity.WithAddrKRs(r.keyPassProvider.KeyPass(), fn) +} + func (r *rwIdentity) CheckAuth(email string, password []byte, telemetry Telemetry) (string, error) { r.lock.RLock() defer r.lock.RUnlock() diff --git a/internal/services/imapservice/sync.go b/internal/services/imapservice/sync.go deleted file mode 100644 index 254294ae..00000000 --- a/internal/services/imapservice/sync.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright (c) 2023 Proton AG -// -// This file is part of Proton Mail Bridge. -// -// Proton Mail Bridge is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Proton Mail Bridge is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with Proton Mail Bridge. If not, see . - -package imapservice - -import ( - "context" - "fmt" - "time" - - "github.com/ProtonMail/gluon/async" - "github.com/ProtonMail/gluon/imap" - "github.com/ProtonMail/gluon/reporter" - "github.com/ProtonMail/proton-bridge/v3/internal/events" - "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity" - "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" - "github.com/bradenaw/juniper/xslices" - "github.com/sirupsen/logrus" - "golang.org/x/exp/maps" - "golang.org/x/exp/slices" -) - -type updatePublisher interface { - publishUpdate(ctx context.Context, update imap.Update) -} - -type syncJob struct { - client APIClient - userID string - labels labelMap - updaters map[string]updatePublisher - addressMode usertypes.AddressMode - syncState SyncStateProvider - eventPublisher events.EventPublisher - log *logrus.Entry - identityState *useridentity.State - panicHandler async.PanicHandler - reporter reporter.Reporter - keyProvider useridentity.KeyPassProvider - maxSyncMemory uint64 -} - -const SyncRetryCoolDown = 20 * time.Second - -func (s *syncJob) run(ctx context.Context) error { - s.log.Info("Sync triggered") - s.eventPublisher.PublishEvent(ctx, events.SyncStarted{UserID: s.userID}) - - if s.syncState.GetSyncStatus().IsComplete() { - s.log.Info("Sync already complete, only system labels will be updated") - - if err := s.syncSystemLabels(ctx); err != nil { - s.log.WithError(err).Error("Failed to sync system labels") - s.eventPublisher.PublishEvent(ctx, events.SyncFailed{ - UserID: s.userID, - Error: err, - }) - return err - } - s.eventPublisher.PublishEvent(ctx, events.SyncFinished{UserID: s.userID}) - return nil - } - - for { - if err := ctx.Err(); err != nil { - s.log.WithError(err).Error("Sync aborted") - return fmt.Errorf("sync aborted: %w", ctx.Err()) - } else if err := s.doSync(ctx); err != nil { - s.log.WithError(err).Error("Failed to sync, will retry later") - sleepCtx(ctx, SyncRetryCoolDown) - } else { - break - } - } - - return nil -} - -func (s *syncJob) syncSystemLabels(ctx context.Context) error { - var updates []imap.Update - - for _, label := range s.labels { - if !WantLabel(label) { - continue - } - - for _, connector := range s.updaters { - update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name) - connector.publishUpdate(ctx, update) - updates = append(updates, update) - } - } - - if err := waitOnIMAPUpdates(ctx, updates); err != nil { - return fmt.Errorf("could not sync system labels: %w", err) - } - - return nil -} - -func (s *syncJob) doSync(ctx context.Context) error { - start := time.Now() - - s.log.WithField("start", start).Info("Beginning user sync") - - s.eventPublisher.PublishEvent(ctx, events.SyncStarted{ - UserID: s.userID, - }) - - if err := s.sync(ctx); err != nil { - s.log.WithError(err).Warn("Failed to sync user") - - s.eventPublisher.PublishEvent(ctx, events.SyncFailed{ - UserID: s.userID, - Error: err, - }) - - return fmt.Errorf("failed to sync: %w", err) - } - - s.log.WithField("duration", time.Since(start)).Info("Finished user sync") - - s.eventPublisher.PublishEvent(ctx, events.SyncFinished{ - UserID: s.userID, - }) - - return nil -} - -func (s *syncJob) sync(ctx context.Context) error { - syncStatus := s.syncState.GetSyncStatus() - - if !syncStatus.HasLabels { - s.log.Info("Syncing labels") - - if err := syncLabels(ctx, s.labels, maps.Values(s.updaters)...); err != nil { - return fmt.Errorf("failed to sync labels: %w", err) - } - - if err := s.syncState.SetHasLabels(true); err != nil { - return fmt.Errorf("failed to set has labels: %w", err) - } - - s.log.Info("Synced labels") - } - - if !syncStatus.HasMessages { - s.log.Info("Syncing messages") - - // Determine which messages to sync. - messageIDs, err := s.client.GetAllMessageIDs(ctx, "") - if err != nil { - return fmt.Errorf("failed to get message IDs to sync: %w", err) - } - - s.log.Debugf("User has the following failed synced message ids: %v", syncStatus.FailedMessageIDs) - - // Remove any messages that have already failed to sync. - messageIDs = xslices.Filter(messageIDs, func(messageID string) bool { - return !slices.Contains(syncStatus.FailedMessageIDs, messageID) - }) - - // Reverse the order of the message IDs so that the newest messages are synced first. - xslices.Reverse(messageIDs) - - // If we have a message ID that we've already synced, then we can skip all messages before it. - if idx := xslices.Index(messageIDs, syncStatus.LastMessageID); idx >= 0 { - messageIDs = messageIDs[idx+1:] - } - - // Sync the messages. - if err := s.syncMessages( - ctx, - messageIDs, - ); err != nil { - return fmt.Errorf("failed to sync messages: %w", err) - } - - if err := s.syncState.SetHasMessages(true); err != nil { - return fmt.Errorf("failed to set has messages: %w", err) - } - - s.log.Info("Synced messages") - } else { - s.log.Info("Messages are already synced, skipping") - } - - return nil -} diff --git a/internal/services/imapservice/sync_attachment_downloader.go b/internal/services/imapservice/sync_attachment_downloader.go deleted file mode 100644 index 5acf6631..00000000 --- a/internal/services/imapservice/sync_attachment_downloader.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright (c) 2023 Proton AG -// -// This file is part of Proton Mail Bridge. -// -// Proton Mail Bridge is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Proton Mail Bridge is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with Proton Mail Bridge. If not, see . - -package imapservice - -import ( - "bytes" - "context" - "fmt" - - "github.com/ProtonMail/gluon/async" - "github.com/ProtonMail/gluon/logging" - "github.com/ProtonMail/go-proton-api" -) - -type attachmentResult struct { - attachment []byte - err error -} - -type attachmentJob struct { - id string - size int64 - result chan attachmentResult -} - -type attachmentDownloader struct { - workerCh chan attachmentJob - cancel context.CancelFunc -} - -func attachmentWorker(ctx context.Context, client APIClient, work <-chan attachmentJob) { - for { - select { - case <-ctx.Done(): - return - case job, ok := <-work: - if !ok { - return - } - var b bytes.Buffer - b.Grow(int(job.size)) - err := client.GetAttachmentInto(ctx, job.id, &b) - select { - case <-ctx.Done(): - close(job.result) - return - case job.result <- attachmentResult{attachment: b.Bytes(), err: err}: - close(job.result) - } - } - } -} - -func (s *syncJob) newAttachmentDownloader(ctx context.Context, client APIClient, workerCount int) *attachmentDownloader { - workerCh := make(chan attachmentJob, (workerCount+2)*workerCount) - ctx, cancel := context.WithCancel(ctx) - for i := 0; i < workerCount; i++ { - workerCh = make(chan attachmentJob) - async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { attachmentWorker(ctx, client, workerCh) }, logging.Labels{ - "sync": fmt.Sprintf("att-downloader %v", i), - }) - } - - return &attachmentDownloader{ - workerCh: workerCh, - cancel: cancel, - } -} - -func (a *attachmentDownloader) getAttachments(ctx context.Context, attachments []proton.Attachment) ([][]byte, error) { - resultChs := make([]chan attachmentResult, len(attachments)) - for i, id := range attachments { - resultChs[i] = make(chan attachmentResult, 1) - select { - case a.workerCh <- attachmentJob{id: id.ID, result: resultChs[i], size: id.Size}: - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - result := make([][]byte, len(attachments)) - var err error - for i := 0; i < len(attachments); i++ { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case r := <-resultChs[i]: - if r.err != nil { - err = fmt.Errorf("failed to get attachment %v: %w", attachments[i], r.err) - } - result[i] = r.attachment - } - } - - return result, err -} - -func (a *attachmentDownloader) close() { - a.cancel() -} diff --git a/internal/services/imapservice/sync_build_test.go b/internal/services/imapservice/sync_build_test.go index a47e11e8..c6e077e0 100644 --- a/internal/services/imapservice/sync_build_test.go +++ b/internal/services/imapservice/sync_build_test.go @@ -24,8 +24,6 @@ import ( "github.com/ProtonMail/gluon/imap" "github.com/ProtonMail/gluon/rfc822" - "github.com/ProtonMail/go-proton-api" - "github.com/bradenaw/juniper/xslices" "github.com/stretchr/testify/require" ) @@ -49,32 +47,3 @@ func TestNewFailedMessageLiteral(t *testing.T) { require.Equal(t, `("text" "plain" () NIL NIL "base64" 114 2)`, parsed.Body) require.Equal(t, `("text" "plain" () NIL NIL "base64" 114 2 NIL NIL NIL NIL)`, parsed.Structure) } - -func TestSyncChunkSyncBuilderBatch(t *testing.T) { - // GODT-2424 - Some messages were not fully built due to a bug in the chunking if the total memory used by the - // message would be higher than the maximum we allowed. - const totalMessageCount = 100 - - msg := proton.FullMessage{ - Message: proton.Message{ - Attachments: []proton.Attachment{ - { - Size: int64(8 * Megabyte), - }, - }, - }, - AttData: nil, - } - - messages := xslices.Repeat(msg, totalMessageCount) - - chunks := chunkSyncBuilderBatch(messages, 16*Megabyte) - - var totalMessagesInChunks int - - for _, v := range chunks { - totalMessagesInChunks += len(v) - } - - require.Equal(t, totalMessagesInChunks, totalMessageCount) -} diff --git a/internal/services/imapservice/sync_labels.go b/internal/services/imapservice/sync_labels.go deleted file mode 100644 index d6a9468d..00000000 --- a/internal/services/imapservice/sync_labels.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright (c) 2023 Proton AG -// -// This file is part of Proton Mail Bridge. -// -// Proton Mail Bridge is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Proton Mail Bridge is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with Proton Mail Bridge. If not, see . - -package imapservice - -import ( - "context" - "fmt" - - "github.com/ProtonMail/gluon/imap" - "github.com/ProtonMail/go-proton-api" -) - -// nolint:exhaustive -func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updatePublishers ...updatePublisher) error { - var updates []imap.Update - - // Create placeholder Folders/Labels mailboxes with the \Noselect attribute. - for _, prefix := range []string{folderPrefix, labelPrefix} { - for _, updateCh := range updatePublishers { - update := newPlaceHolderMailboxCreatedUpdate(prefix) - updateCh.publishUpdate(ctx, update) - updates = append(updates, update) - } - } - - // Sync the user's labels. - for labelID, label := range apiLabels { - if !WantLabel(label) { - continue - } - - switch label.Type { - case proton.LabelTypeSystem: - for _, updateCh := range updatePublishers { - update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name) - updateCh.publishUpdate(ctx, update) - updates = append(updates, update) - } - - case proton.LabelTypeFolder, proton.LabelTypeLabel: - for _, updateCh := range updatePublishers { - update := newMailboxCreatedUpdate(imap.MailboxID(labelID), GetMailboxName(label)) - updateCh.publishUpdate(ctx, update) - updates = append(updates, update) - } - - default: - return fmt.Errorf("unknown label type: %d", label.Type) - } - } - - // Wait for all label updates to be applied. - for _, update := range updates { - err, ok := update.WaitContext(ctx) - if ok && err != nil { - return fmt.Errorf("failed to apply label create update in gluon %v: %w", update.String(), err) - } - } - - return nil -} diff --git a/internal/services/imapservice/sync_message_builder.go b/internal/services/imapservice/sync_message_builder.go new file mode 100644 index 00000000..45adbb61 --- /dev/null +++ b/internal/services/imapservice/sync_message_builder.go @@ -0,0 +1,63 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "bytes" + + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/gopenpgp/v2/crypto" + "github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice" + "github.com/ProtonMail/proton-bridge/v3/pkg/message" +) + +type SyncMessageBuilder struct { + state *rwIdentity +} + +func NewSyncMessageBuilder(rw *rwIdentity) *SyncMessageBuilder { + return &SyncMessageBuilder{state: rw} +} + +func (s SyncMessageBuilder) WithKeys(f func(*crypto.KeyRing, map[string]*crypto.KeyRing) error) error { + return s.state.WithAddrKRs(f) +} + +func (s SyncMessageBuilder) BuildMessage( + apiLabels map[string]proton.Label, + full proton.FullMessage, + addrKR *crypto.KeyRing, + buffer *bytes.Buffer, +) (syncservice.BuildResult, error) { + buffer.Grow(full.Size) + + if err := message.BuildRFC822Into(addrKR, full.Message, full.AttData, defaultMessageJobOpts(), buffer); err != nil { + return syncservice.BuildResult{}, err + } + + update, err := newMessageCreatedUpdate(apiLabels, full.MessageMetadata, buffer.Bytes()) + if err != nil { + return syncservice.BuildResult{}, err + } + + return syncservice.BuildResult{ + AddressID: full.Message.AddressID, + MessageID: full.Message.ID, + Update: update, + }, nil +} diff --git a/internal/services/imapservice/sync_messages.go b/internal/services/imapservice/sync_messages.go deleted file mode 100644 index 4f71add4..00000000 --- a/internal/services/imapservice/sync_messages.go +++ /dev/null @@ -1,523 +0,0 @@ -// Copyright (c) 2023 Proton AG -// -// This file is part of Proton Mail Bridge. -// -// Proton Mail Bridge is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Proton Mail Bridge is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with Proton Mail Bridge. If not, see . - -package imapservice - -import ( - "bytes" - "context" - "fmt" - "os" - "runtime" - "time" - - "github.com/ProtonMail/gluon/async" - "github.com/ProtonMail/gluon/imap" - "github.com/ProtonMail/gluon/logging" - "github.com/ProtonMail/gluon/reporter" - "github.com/ProtonMail/go-proton-api" - "github.com/ProtonMail/gopenpgp/v2/crypto" - "github.com/bradenaw/juniper/parallel" - "github.com/bradenaw/juniper/xslices" - "github.com/pbnjay/memory" - "github.com/sirupsen/logrus" -) - -func (s *syncJob) syncMessages(ctx context.Context, messageIDs []string) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Track the amount of time to process all the messages. - syncStartTime := time.Now() - defer func() { logrus.WithField("duration", time.Since(syncStartTime)).Info("Message sync completed") }() - - s.log.WithFields(logrus.Fields{ - "messages": len(messageIDs), - "numCPU": runtime.NumCPU(), - }).Info("Starting message sync") - - // Create the flushers, one per update channel. - - // Create a reporter to report sync progress updates. - syncReporter := newSyncReporter(s.userID, s.eventPublisher, len(messageIDs), time.Second) - defer syncReporter.done(ctx) - - // Expected mem usage for this whole process should be the sum of MaxMessageBuildingMem and MaxDownloadRequestMem - // times x due to pipeline and all additional memory used by network requests and compression+io. - - totalMemory := memory.TotalMemory() - - syncLimits := newSyncLimits(s.maxSyncMemory) - - if syncLimits.MaxSyncMemory >= totalMemory/2 { - logrus.Warnf("Requested max sync memory of %v MB is greater than half of system memory (%v MB), forcing to half of system memory", - toMB(syncLimits.MaxSyncMemory), toMB(totalMemory/2)) - syncLimits.MaxSyncMemory = totalMemory / 2 - } - - if syncLimits.MaxSyncMemory < 800*Megabyte { - logrus.Warnf("Requested max sync memory of %v MB, but minimum recommended is 800 MB, forcing max syncMemory to 800MB", toMB(syncLimits.MaxSyncMemory)) - syncLimits.MaxSyncMemory = 800 * Megabyte - } - - logrus.Debugf("Total System Memory: %v", toMB(totalMemory)) - - // Linter says it's not used. This is a lie. - var syncMaxDownloadRequestMem uint64 - - // Linter says it's not used. This is a lie. - var syncMaxMessageBuildingMem uint64 - - // If less than 2GB available try and limit max memory to 512 MB - switch { - case syncLimits.MaxSyncMemory < 2*Gigabyte: - if syncLimits.MaxSyncMemory < 800*Megabyte { - logrus.Warnf("System has less than 800MB of memory, you may experience issues sycing large mailboxes") - } - syncMaxDownloadRequestMem = syncLimits.MinDownloadRequestMem - syncMaxMessageBuildingMem = syncLimits.MinMessageBuildingMem - case syncLimits.MaxSyncMemory == 2*Gigabyte: - // Increasing the max download capacity has very little effect on sync speed. We could increase the download - // memory but the user would see less sync notifications. A smaller value here leads to more frequent - // updates. Additionally, most of sync time is spent in the message building. - syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem - // Currently limited so that if a user has multiple accounts active it also doesn't cause excessive memory usage. - syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem - default: - // Divide by 8 as download stage and build stage will use aprox. 4x the specified memory. - remainingMemory := (syncLimits.MaxSyncMemory - 2*Gigabyte) / 8 - syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem + remainingMemory - syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem + remainingMemory - } - - logrus.Debugf("Max memory usage for sync Download=%vMB Building=%vMB Predicted Max Total=%vMB", - toMB(syncMaxDownloadRequestMem), - toMB(syncMaxMessageBuildingMem), - toMB((syncMaxMessageBuildingMem*4)+(syncMaxDownloadRequestMem*4)), - ) - - downloadCh := startMetadataDownloader(ctx, s, messageIDs, syncMaxDownloadRequestMem) - buildCh, errorCh := startMessageDownloader(ctx, s, syncLimits, downloadCh) - flushCh := startMessageBuilder(ctx, s, buildCh, syncMaxMessageBuildingMem) - flushUpdateCh := startMessageFlusher(ctx, s, flushCh) - - for flushUpdate := range flushUpdateCh { - if flushUpdate.err != nil { - return flushUpdate.err - } - - if err := s.syncState.SetLastMessageID(flushUpdate.messageID); err != nil { - return fmt.Errorf("failed to set last synced message ID: %w", err) - } - - syncReporter.add(ctx, flushUpdate.batchLen) - } - - return <-errorCh -} - -const Kilobyte = uint64(1024) -const Megabyte = 1024 * Kilobyte -const Gigabyte = 1024 * Megabyte - -func toMB(v uint64) float64 { - return float64(v) / float64(Megabyte) -} - -type syncLimits struct { - MaxDownloadRequestMem uint64 - MinDownloadRequestMem uint64 - MaxMessageBuildingMem uint64 - MinMessageBuildingMem uint64 - MaxSyncMemory uint64 - MaxParallelDownloads int -} - -func newSyncLimits(maxSyncMemory uint64) syncLimits { - limits := syncLimits{ - // There's no point in using more than 128MB of download data per stage, after that we reach a point of diminishing - // returns as we can't keep the pipeline fed fast enough. - MaxDownloadRequestMem: 128 * Megabyte, - - // Any lower than this and we may fail to download messages. - MinDownloadRequestMem: 40 * Megabyte, - - // This value can be increased to your hearts content. The more system memory the user has, the more messages - // we can build in parallel. - MaxMessageBuildingMem: 128 * Megabyte, - MinMessageBuildingMem: 64 * Megabyte, - - // Maximum recommend value for parallel downloads by the API team. - MaxParallelDownloads: 20, - - MaxSyncMemory: maxSyncMemory, - } - - if _, ok := os.LookupEnv("BRIDGE_SYNC_FORCE_MINIMUM_SPEC"); ok { - logrus.Warn("Sync specs forced to minimum") - limits.MaxDownloadRequestMem = 50 * Megabyte - limits.MaxMessageBuildingMem = 80 * Megabyte - limits.MaxParallelDownloads = 2 - limits.MaxSyncMemory = 800 * Megabyte - } - - return limits -} - -func chunkSyncBuilderBatch(batch []proton.FullMessage, maxMemory uint64) [][]proton.FullMessage { - var expectedMemUsage uint64 - var chunks [][]proton.FullMessage - var lastIndex int - var index int - - for _, v := range batch { - var dataSize uint64 - for _, a := range v.Attachments { - dataSize += uint64(a.Size) - } - - // 2x increase for attachment due to extra memory needed for decrypting and writing - // in memory buffer. - dataSize *= 2 - dataSize += uint64(len(v.Body)) - - nextMemSize := expectedMemUsage + dataSize - if nextMemSize >= maxMemory { - chunks = append(chunks, batch[lastIndex:index]) - lastIndex = index - expectedMemUsage = dataSize - } else { - expectedMemUsage = nextMemSize - } - - index++ - } - - if lastIndex < len(batch) { - chunks = append(chunks, batch[lastIndex:]) - } - - return chunks -} - -type flushUpdate struct { - messageID string - err error - batchLen int -} - -type downloadRequest struct { - ids []string - expectedSize uint64 - err error -} - -type downloadedMessageBatch struct { - batch []proton.FullMessage -} - -type builtMessageBatch struct { - batch []*buildRes -} - -func startMetadataDownloader(ctx context.Context, s *syncJob, messageIDs []string, syncMaxDownloadRequestMem uint64) <-chan downloadRequest { - downloadCh := make(chan downloadRequest) - // Go routine in charge of downloading message metadata - async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { - defer close(downloadCh) - const MetadataDataPageSize = 150 - - var downloadReq downloadRequest - downloadReq.ids = make([]string, 0, MetadataDataPageSize) - - metadataChunks := xslices.Chunk(messageIDs, MetadataDataPageSize) - for i, metadataChunk := range metadataChunks { - logrus.Debugf("Metadata Request (%v of %v), previous: %v", i, len(metadataChunks), len(downloadReq.ids)) - metadata, err := s.client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk}) - if err != nil { - logrus.WithError(err).Errorf("Failed to download message metadata for chunk %v", i) - downloadReq.err = err - select { - case downloadCh <- downloadReq: - case <-ctx.Done(): - return - } - return - } - - if ctx.Err() != nil { - return - } - - // Build look up table so that messages are processed in the same order. - metadataMap := make(map[string]int, len(metadata)) - for i, v := range metadata { - metadataMap[v.ID] = i - } - - for i, id := range metadataChunk { - m := &metadata[metadataMap[id]] - nextSize := downloadReq.expectedSize + uint64(m.Size) - if nextSize >= syncMaxDownloadRequestMem || len(downloadReq.ids) >= 256 { - logrus.Debugf("Download Request Sent at %v of %v", i, len(metadata)) - select { - case downloadCh <- downloadReq: - - case <-ctx.Done(): - return - } - downloadReq.expectedSize = 0 - downloadReq.ids = make([]string, 0, MetadataDataPageSize) - nextSize = uint64(m.Size) - } - downloadReq.ids = append(downloadReq.ids, id) - downloadReq.expectedSize = nextSize - } - } - - if len(downloadReq.ids) != 0 { - logrus.Debugf("Sending remaining download request") - select { - case downloadCh <- downloadReq: - - case <-ctx.Done(): - return - } - } - }, logging.Labels{"sync-stage": "meta-data"}) - - return downloadCh -} - -func startMessageDownloader(ctx context.Context, s *syncJob, syncLimits syncLimits, downloadCh <-chan downloadRequest) (<-chan downloadedMessageBatch, <-chan error) { - buildCh := make(chan downloadedMessageBatch) - errorCh := make(chan error, syncLimits.MaxParallelDownloads*4) - - // Goroutine in charge of downloading and building messages in maxBatchSize batches. - async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { - defer close(buildCh) - defer close(errorCh) - defer func() { - logrus.Debugf("sync downloader exit") - }() - - attachmentDownloader := s.newAttachmentDownloader(ctx, s.client, syncLimits.MaxParallelDownloads) - defer attachmentDownloader.close() - - for request := range downloadCh { - logrus.Debugf("Download request: %v MB:%v", len(request.ids), toMB(request.expectedSize)) - if request.err != nil { - errorCh <- request.err - return - } - - if ctx.Err() != nil { - errorCh <- ctx.Err() - return - } - - result, err := parallel.MapContext(ctx, syncLimits.MaxParallelDownloads, request.ids, func(ctx context.Context, id string) (proton.FullMessage, error) { - defer async.HandlePanic(s.panicHandler) - - var result proton.FullMessage - - msg, err := s.client.GetMessage(ctx, id) - if err != nil { - logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message") - return proton.FullMessage{}, err - } - - attachments, err := attachmentDownloader.getAttachments(ctx, msg.Attachments) - if err != nil { - logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message attachments") - return proton.FullMessage{}, err - } - - result.Message = msg - result.AttData = attachments - - return result, nil - }) - if err != nil { - errorCh <- err - return - } - - select { - case buildCh <- downloadedMessageBatch{ - batch: result, - }: - - case <-ctx.Done(): - return - } - } - }, logging.Labels{"sync-stage": "download"}) - - return buildCh, errorCh -} - -func startMessageBuilder(ctx context.Context, s *syncJob, buildCh <-chan downloadedMessageBatch, syncMaxMessageBuildingMem uint64) <-chan builtMessageBatch { - flushCh := make(chan builtMessageBatch) - - // Goroutine which builds messages after they have been downloaded - async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { - defer close(flushCh) - defer func() { - logrus.Debugf("sync builder exit") - }() - - if err := s.identityState.WithAddrKRs(s.keyProvider.KeyPass(), func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error { - maxMessagesInParallel := runtime.NumCPU() - - for buildBatch := range buildCh { - if ctx.Err() != nil { - return ctx.Err() - } - - chunks := chunkSyncBuilderBatch(buildBatch.batch, syncMaxMessageBuildingMem) - - for index, chunk := range chunks { - logrus.Debugf("Build request: %v of %v count=%v", index, len(chunks), len(chunk)) - - result, err := parallel.MapContext(ctx, maxMessagesInParallel, chunk, func(ctx context.Context, msg proton.FullMessage) (*buildRes, error) { - defer async.HandlePanic(s.panicHandler) - - kr, ok := addrKRs[msg.AddressID] - if !ok { - logrus.Errorf("Address '%v' on message '%v' does not have an unlocked kerying", msg.AddressID, msg.ID) - return &buildRes{ - messageID: msg.ID, - addressID: msg.AddressID, - err: fmt.Errorf("address does not have an unlocked keyring"), - }, nil - } - - res := buildRFC822(s.labels, msg, kr, new(bytes.Buffer)) - if res.err != nil { - s.log.WithError(res.err).WithField("msgID", msg.ID).Error("Failed to build message (syn)") - } - - return res, nil - }) - if err != nil { - return err - } - - select { - case flushCh <- builtMessageBatch{result}: - - case <-ctx.Done(): - return nil - } - } - } - - return nil - }); err != nil { - s.log.WithError(err).Error("Sync message builder exited with error") - } - }, logging.Labels{"sync-stage": "builder"}) - - return flushCh -} - -func startMessageFlusher(ctx context.Context, s *syncJob, messageBatchCH <-chan builtMessageBatch) <-chan flushUpdate { - flushUpdateCh := make(chan flushUpdate) - - // Goroutine which converts the messages into updates and builds a waitable structure for progress tracking. - async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { - defer close(flushUpdateCh) - defer func() { - logrus.Debugf("sync flush exit") - }() - - type updateTargetInfo struct { - queueIndex int - ch updatePublisher - } - - pendingUpdates := make([][]*imap.MessageCreated, len(s.updaters)) - addressToIndex := make(map[string]updateTargetInfo) - - { - i := 0 - for addrID, updateCh := range s.updaters { - addressToIndex[addrID] = updateTargetInfo{ - ch: updateCh, - queueIndex: i, - } - i++ - } - } - - for downloadBatch := range messageBatchCH { - logrus.Debugf("Flush batch: %v", len(downloadBatch.batch)) - for _, res := range downloadBatch.batch { - if res.err != nil { - if err := s.syncState.AddFailedMessageID(res.messageID); err != nil { - logrus.WithError(err).Error("Failed to add failed message ID") - } - - if err := s.reporter.ReportMessageWithContext("Failed to build message (sync)", reporter.Context{ - "messageID": res.messageID, - "error": res.err, - }); err != nil { - s.log.WithError(err).Error("Failed to report message build error") - } - - // We could sync a placeholder message here, but for now we skip it entirely. - continue - } - - if err := s.syncState.RemFailedMessageID(res.messageID); err != nil { - logrus.WithError(err).Error("Failed to remove failed message ID") - } - - targetInfo := addressToIndex[res.addressID] - pendingUpdates[targetInfo.queueIndex] = append(pendingUpdates[targetInfo.queueIndex], res.update) - } - - for _, info := range addressToIndex { - up := imap.NewMessagesCreated(true, pendingUpdates[info.queueIndex]...) - info.ch.publishUpdate(ctx, up) - - err, ok := up.WaitContext(ctx) - if ok && err != nil { - flushUpdateCh <- flushUpdate{ - err: fmt.Errorf("failed to apply sync update to gluon %v: %w", up.String(), err), - } - return - } - - pendingUpdates[info.queueIndex] = pendingUpdates[info.queueIndex][:0] - } - - select { - case flushUpdateCh <- flushUpdate{ - messageID: downloadBatch.batch[0].messageID, - err: nil, - batchLen: len(downloadBatch.batch), - }: - case <-ctx.Done(): - return - } - } - }, logging.Labels{"sync-stage": "flush"}) - - return flushUpdateCh -} diff --git a/internal/services/imapservice/sync_reporter.go b/internal/services/imapservice/sync_reporter.go index 7adda6ca..5eef0519 100644 --- a/internal/services/imapservice/sync_reporter.go +++ b/internal/services/imapservice/sync_reporter.go @@ -29,25 +29,32 @@ type syncReporter struct { eventPublisher events.EventPublisher start time.Time - total int - count int + total int64 + count int64 last time.Time freq time.Duration } -func newSyncReporter(userID string, eventsPublisher events.EventPublisher, total int, freq time.Duration) *syncReporter { - return &syncReporter{ - userID: userID, - eventPublisher: eventsPublisher, - - start: time.Now(), - total: total, - freq: freq, - } +func (rep *syncReporter) OnStart(ctx context.Context) { + rep.start = time.Now() + rep.eventPublisher.PublishEvent(ctx, events.SyncStarted{UserID: rep.userID}) } -func (rep *syncReporter) add(ctx context.Context, delta int) { +func (rep *syncReporter) OnFinished(ctx context.Context) { + rep.eventPublisher.PublishEvent(ctx, events.SyncFinished{ + UserID: rep.userID, + }) +} + +func (rep *syncReporter) OnError(ctx context.Context, err error) { + rep.eventPublisher.PublishEvent(ctx, events.SyncFailed{ + UserID: rep.userID, + Error: err, + }) +} + +func (rep *syncReporter) OnProgress(ctx context.Context, delta int64) { rep.count += delta if time.Since(rep.last) > rep.freq { @@ -62,11 +69,17 @@ func (rep *syncReporter) add(ctx context.Context, delta int) { } } -func (rep *syncReporter) done(ctx context.Context) { - rep.eventPublisher.PublishEvent(ctx, events.SyncProgress{ - UserID: rep.userID, - Progress: 1, - Elapsed: time.Since(rep.start), - Remaining: 0, - }) +func (rep *syncReporter) InitializeProgressCounter(_ context.Context, current int64, total int64) { + rep.count = current + rep.total = total +} + +func newSyncReporter(userID string, eventsPublisher events.EventPublisher, freq time.Duration) *syncReporter { + return &syncReporter{ + userID: userID, + eventPublisher: eventsPublisher, + + start: time.Now(), + freq: freq, + } } diff --git a/internal/services/imapservice/sync_state_provider.go b/internal/services/imapservice/sync_state_provider.go new file mode 100644 index 00000000..a36ae7c9 --- /dev/null +++ b/internal/services/imapservice/sync_state_provider.go @@ -0,0 +1,253 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "sync" + + "github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice" + "github.com/bradenaw/juniper/xmaps" +) + +type SyncState struct { + filePath string + status syncservice.Status + lock sync.Mutex +} + +var ErrInvalidSyncFileVersion = errors.New("invalid sync file version") + +const SyncFileVersion = 1 + +type syncStateFile struct { + Version int + Data string +} + +type syncFileVersion1 struct { + Status syncservice.Status +} + +func NewSyncState(filePath string) (*SyncState, error) { + s := &SyncState{filePath: filePath, status: syncservice.DefaultStatus()} + + if err := s.loadUnsafe(); err != nil { + return nil, err + } + + return s, nil +} + +func (s *SyncState) AddFailedMessageID(_ context.Context, ids ...string) error { + s.lock.Lock() + defer s.lock.Unlock() + + count := len(s.status.FailedMessages) + + for _, id := range ids { + s.status.FailedMessages.Add(id) + } + + // Only update if something change. + if count == len(s.status.FailedMessages) { + return nil + } + + return s.storeUnsafe() +} + +func (s *SyncState) RemFailedMessageID(_ context.Context, ids ...string) error { + s.lock.Lock() + defer s.lock.Unlock() + + count := len(s.status.FailedMessages) + + for _, id := range ids { + s.status.FailedMessages.Remove(id) + } + + // Only update if something change. + if count == len(s.status.FailedMessages) { + return nil + } + + return s.storeUnsafe() +} + +func (s *SyncState) GetSyncStatus(_ context.Context) (syncservice.Status, error) { + s.lock.Lock() + defer s.lock.Unlock() + + return s.status, nil +} + +func (s *SyncState) ClearSyncStatus(_ context.Context) error { + s.lock.Lock() + defer s.lock.Unlock() + + oldStatus := s.status + + s.status = syncservice.DefaultStatus() + + if err := s.storeUnsafe(); err != nil { + s.status = oldStatus + return err + } + + return nil +} + +func (s *SyncState) SetHasLabels(_ context.Context, b bool) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.status.HasLabels = b + + return s.storeUnsafe() +} + +func (s *SyncState) SetHasMessages(_ context.Context, b bool) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.status.HasMessages = b + + return s.storeUnsafe() +} + +func (s *SyncState) SetLastMessageID(_ context.Context, s2 string, i int64) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.status.LastSyncedMessageID = s2 + s.status.NumSyncedMessages += i + + return s.storeUnsafe() +} + +func (s *SyncState) SetMessageCount(_ context.Context, i int64) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.status.TotalMessageCount = i + s.status.HasMessageCount = true + + return s.storeUnsafe() +} + +func (s *SyncState) storeUnsafe() error { + return storeImpl(&s.status, s.filePath) +} + +func storeImpl(status *syncservice.Status, path string) error { + data, err := json.Marshal(syncFileVersion1{Status: *status}) + if err != nil { + return fmt.Errorf("failed to marshal sync state data: %w", err) + } + + syncFile := syncStateFile{ + Version: SyncFileVersion, + Data: string(data), + } + + syncFileData, err := json.Marshal(syncFile) + if err != nil { + return fmt.Errorf("failde to marshal sync state file: %w", err) + } + + tmpFile := path + ".tmp" + + if err := os.WriteFile(tmpFile, syncFileData, 0o600); err != nil { + return fmt.Errorf("failed to write sync state to tmp file: %w", err) + } + + if err := os.Rename(tmpFile, path); err != nil { + return fmt.Errorf("failed to update sync state: %w", err) + } + + return nil +} + +func (s *SyncState) loadUnsafe() error { + data, err := os.ReadFile(s.filePath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + + return err + } + + var syncFile syncStateFile + + if err := json.Unmarshal(data, &syncFile); err != nil { + return fmt.Errorf("failed to unmarshal sync file: %w", err) + } + + if syncFile.Version != SyncFileVersion { + return ErrInvalidSyncFileVersion + } + + var v1 syncFileVersion1 + + if err := json.Unmarshal([]byte(syncFile.Data), &v1); err != nil { + return fmt.Errorf("failed to unmarshal sync data: %w", err) + } + + s.status = v1.Status + + return nil +} + +func DeleteSyncState(configDir, userID string) error { + path := getSyncConfigPath(configDir, userID) + + return os.Remove(path) +} + +func MigrateVaultSettings( + configDir, userID string, + hasLabels, hasMessages bool, + failedMessageIDs []string, +) (bool, error) { + filePath := getSyncConfigPath(configDir, userID) + + _, err := os.ReadFile(filePath) //nolint:gosec + if err == nil { + // File already exists, sync has been migrated. + return false, nil + } + + if err != nil && !errors.Is(err, os.ErrNotExist) { + // unexpected error occurred. + return false, err + } + + status := syncservice.DefaultStatus() + status.HasLabels = hasLabels + status.HasMessages = hasMessages + status.HasMessageCount = hasMessages + status.FailedMessages = xmaps.SetFromSlice(failedMessageIDs) + + return true, storeImpl(&status, filePath) +} diff --git a/internal/services/imapservice/sync_state_provider_test.go b/internal/services/imapservice/sync_state_provider_test.go new file mode 100644 index 00000000..0852c53f --- /dev/null +++ b/internal/services/imapservice/sync_state_provider_test.go @@ -0,0 +1,80 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "testing" + + "github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice" + "github.com/bradenaw/juniper/xmaps" + "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" +) + +func TestMigrateSyncSettings_AlreadyExists(t *testing.T) { + tmpDir := t.TempDir() + testFile := getSyncConfigPath(tmpDir, "test") + + expected, err := generateTestState(testFile) + require.NoError(t, err) + + migrated, err := MigrateVaultSettings(tmpDir, "test", true, true, nil) + require.NoError(t, err) + require.False(t, migrated) + + state, err := NewSyncState(testFile) + require.NoError(t, err) + status, err := state.GetSyncStatus(context.Background()) + require.NoError(t, err) + require.Equal(t, expected, status) +} + +func TestMigrateSyncSettings_DoesNotExist(t *testing.T) { + tmpDir := t.TempDir() + + failedIDs := []string{"foo", "bar"} + migrated, err := MigrateVaultSettings(tmpDir, "test", true, true, failedIDs) + require.NoError(t, err) + require.True(t, migrated) + + state, err := NewSyncState(getSyncConfigPath(tmpDir, "test")) + require.NoError(t, err) + status, err := state.GetSyncStatus(context.Background()) + require.NoError(t, err) + require.Zero(t, status.NumSyncedMessages) + require.Zero(t, status.TotalMessageCount) + require.Empty(t, status.LastSyncedMessageID) + require.ElementsMatch(t, failedIDs, maps.Keys(status.FailedMessages)) + require.True(t, status.HasLabels) + require.True(t, status.HasMessageCount) + require.True(t, status.HasMessages) +} + +func generateTestState(path string) (syncservice.Status, error) { + status := syncservice.DefaultStatus() + + status.HasMessages = true + status.HasLabels = false + status.FailedMessages = xmaps.SetFromSlice([]string{"foo", "bar"}) + status.TotalMessageCount = 1204 + status.NumSyncedMessages = 100 + status.HasMessages = true + + return status, storeImpl(&status, path) +} diff --git a/internal/services/imapservice/sync_update_applier.go b/internal/services/imapservice/sync_update_applier.go new file mode 100644 index 00000000..a202ddbf --- /dev/null +++ b/internal/services/imapservice/sync_update_applier.go @@ -0,0 +1,234 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + + "github.com/ProtonMail/gluon/imap" + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice" + "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" + "github.com/bradenaw/juniper/xslices" + "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" +) + +type SyncUpdateApplier struct { + requestCh chan updateRequest + replyCh chan updateReply +} + +type updateReply struct { + updates []imap.Update + err error +} + +type updateRequest = func(ctx context.Context, mode usertypes.AddressMode, connectors map[string]*Connector) ([]imap.Update, error) + +func NewSyncUpdateApplier() *SyncUpdateApplier { + return &SyncUpdateApplier{ + requestCh: make(chan updateRequest), + replyCh: make(chan updateReply), + } +} + +func (s *SyncUpdateApplier) Close() { + close(s.requestCh) + close(s.replyCh) +} + +func (s *SyncUpdateApplier) ApplySyncUpdates(ctx context.Context, updates []syncservice.BuildResult) error { + request := func(ctx context.Context, mode usertypes.AddressMode, connectors map[string]*Connector) ([]imap.Update, error) { + if mode == usertypes.AddressModeCombined { + if len(connectors) != 1 { + return nil, fmt.Errorf("unexpected connecto list state") + } + + c := maps.Values(connectors)[0] + + update := imap.NewMessagesCreated(true, xslices.Map(updates, func(b syncservice.BuildResult) *imap.MessageCreated { + return b.Update + })...) + + c.publishUpdate(ctx, update) + + return []imap.Update{update}, nil + } + + updateMap := make(map[string]*imap.MessagesCreated, len(connectors)) + result := make([]imap.Update, 0, len(connectors)) + + for _, up := range updates { + update, ok := updateMap[up.AddressID] + if !ok { + update = imap.NewMessagesCreated(true) + updateMap[up.AddressID] = update + result = append(result, update) + } + + update.Messages = append(update.Messages, up.Update) + } + + for addrID, update := range updateMap { + c, ok := connectors[addrID] + if !ok { + logrus.Warnf("Could not find connector for address %v", addrID) + continue + } + + c.publishUpdate(ctx, update) + } + + return result, nil + } + + result, err := s.sendRequest(ctx, request) + if err != nil { + return err + } + + if err := waitOnIMAPUpdates(ctx, result); err != nil { + return fmt.Errorf("could not apply updates: %w", err) + } + + return nil +} + +func (s *SyncUpdateApplier) SyncSystemLabelsOnly(ctx context.Context, labels map[string]proton.Label) error { + request := func(ctx context.Context, _ usertypes.AddressMode, connectors map[string]*Connector) ([]imap.Update, error) { + updates := make([]imap.Update, 0, len(labels)*len(connectors)) + for _, label := range labels { + if !WantLabel(label) { + continue + } + + for _, c := range connectors { + update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name) + updates = append(updates, update) + c.publishUpdate(ctx, update) + } + } + return updates, nil + } + + updates, err := s.sendRequest(ctx, request) + if err != nil { + return err + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return fmt.Errorf("could not sync system labels: %w", err) + } + + return nil +} + +func (s *SyncUpdateApplier) SyncLabels(ctx context.Context, labels map[string]proton.Label) error { + request := func(ctx context.Context, _ usertypes.AddressMode, connectors map[string]*Connector) ([]imap.Update, error) { + return syncLabels(ctx, labels, maps.Values(connectors)) + } + + updates, err := s.sendRequest(ctx, request) + if err != nil { + return err + } + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return fmt.Errorf("could not sync labels: %w", err) + } + + return nil +} + +// nolint:exhaustive +func syncLabels(ctx context.Context, labels map[string]proton.Label, connectors []*Connector) ([]imap.Update, error) { + var updates []imap.Update + + // Create placeholder Folders/Labels mailboxes with the \Noselect attribute. + for _, prefix := range []string{folderPrefix, labelPrefix} { + for _, updateCh := range connectors { + update := newPlaceHolderMailboxCreatedUpdate(prefix) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + } + + // Sync the user's labels. + for labelID, label := range labels { + if !WantLabel(label) { + continue + } + + switch label.Type { + case proton.LabelTypeSystem: + for _, updateCh := range connectors { + update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + + case proton.LabelTypeFolder, proton.LabelTypeLabel: + for _, updateCh := range connectors { + update := newMailboxCreatedUpdate(imap.MailboxID(labelID), GetMailboxName(label)) + updateCh.publishUpdate(ctx, update) + updates = append(updates, update) + } + + default: + return nil, fmt.Errorf("unknown label type: %d", label.Type) + } + } + + return updates, nil +} + +func (s *SyncUpdateApplier) sendRequest(ctx context.Context, request updateRequest) ([]imap.Update, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case s.requestCh <- request: + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case reply, ok := <-s.replyCh: + if !ok { + return nil, fmt.Errorf("no reply") + } + + if reply.err != nil { + return nil, reply.err + } + + return reply.updates, nil + } +} + +func (s *SyncUpdateApplier) reply(ctx context.Context, updates []imap.Update, err error) error { + select { + case <-ctx.Done(): + return ctx.Err() + case s.replyCh <- updateReply{ + updates: updates, + err: err, + }: + return nil + } +} diff --git a/internal/services/imapsmtpserver/service.go b/internal/services/imapsmtpserver/service.go index 2b575cc4..7eb0f453 100644 --- a/internal/services/imapsmtpserver/service.go +++ b/internal/services/imapsmtpserver/service.go @@ -32,6 +32,7 @@ import ( "github.com/ProtonMail/proton-bridge/v3/internal/events" "github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice" bridgesmtp "github.com/ProtonMail/proton-bridge/v3/internal/services/smtp" + "github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice" "github.com/ProtonMail/proton-bridge/v3/pkg/cpc" "github.com/emersion/go-smtp" "github.com/sirupsen/logrus" @@ -134,7 +135,7 @@ func (sm *Service) AddIMAPUser( connector connector.Connector, addrID string, idProvider imapservice.GluonIDProvider, - syncStateProvider imapservice.SyncStateProvider, + syncStateProvider syncservice.StateProvider, ) error { _, err := sm.requests.Send(ctx, &smRequestAddIMAPUser{ connector: connector, @@ -302,7 +303,7 @@ func (sm *Service) handleAddIMAPUser(ctx context.Context, connector connector.Connector, addrID string, idProvider imapservice.GluonIDProvider, - syncStateProvider imapservice.SyncStateProvider, + syncStateProvider syncservice.StateProvider, ) error { // Due to the many different error exits, performer user count change at this stage rather we split the incrementing // of users from the logic. @@ -318,7 +319,7 @@ func (sm *Service) handleAddIMAPUserImpl(ctx context.Context, connector connector.Connector, addrID string, idProvider imapservice.GluonIDProvider, - syncStateProvider imapservice.SyncStateProvider, + syncStateProvider syncservice.StateProvider, ) error { if sm.imapServer == nil { return fmt.Errorf("no imap server instance running") @@ -348,7 +349,7 @@ func (sm *Service) handleAddIMAPUserImpl(ctx context.Context, } // Clear the sync status -- we need to resync all messages. - if err := syncStateProvider.ClearSyncStatus(); err != nil { + if err := syncStateProvider.ClearSyncStatus(ctx); err != nil { return fmt.Errorf("failed to clear sync status: %w", err) } @@ -358,7 +359,14 @@ func (sm *Service) handleAddIMAPUserImpl(ctx context.Context, } else if isNew { panic("IMAP user should already have a database") } - } else if status := syncStateProvider.GetSyncStatus(); !status.HasLabels { + } + + status, err := syncStateProvider.GetSyncStatus(ctx) + if err != nil { + return fmt.Errorf("failed to get sync status: %w", err) + } + + if !status.HasLabels { // Otherwise, the DB already exists -- if the labels are not yet synced, we need to re-create the DB. if err := sm.imapServer.RemoveUser(ctx, gluonID, true); err != nil { return fmt.Errorf("failed to remove old IMAP user: %w", err) @@ -710,7 +718,7 @@ type smRequestAddIMAPUser struct { connector connector.Connector addrID string idProvider imapservice.GluonIDProvider - syncStateProvider imapservice.SyncStateProvider + syncStateProvider syncservice.StateProvider } type smRequestRemoveIMAPUser struct { diff --git a/internal/user/debug.go b/internal/user/debug.go index a5f733c2..8ab86bbc 100644 --- a/internal/user/debug.go +++ b/internal/user/debug.go @@ -138,7 +138,10 @@ func (apm DiagnosticMetadata) BuildMailboxToMessageMap(ctx context.Context, user } func (user *User) GetDiagnosticMetadata(ctx context.Context) (DiagnosticMetadata, error) { - failedMessages := xmaps.SetFromSlice(user.vault.SyncStatus().FailedMessageIDs) + failedMessages, err := user.imapService.GetSyncFailedMessageIDs(ctx) + if err != nil { + return DiagnosticMetadata{}, err + } messageIDs, err := user.client.GetAllMessageIDs(ctx, "") if err != nil { @@ -159,7 +162,7 @@ func (user *User) GetDiagnosticMetadata(ctx context.Context) (DiagnosticMetadata return DiagnosticMetadata{ MessageIDs: messageIDs, Metadata: meta, - FailedMessageIDs: failedMessages, + FailedMessageIDs: xmaps.SetFromSlice(failedMessages), }, nil } diff --git a/internal/user/user.go b/internal/user/user.go index 5274e2e5..476d36a2 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -34,6 +34,7 @@ import ( "github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks" "github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder" "github.com/ProtonMail/proton-bridge/v3/internal/services/smtp" + "github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice" telemetryservice "github.com/ProtonMail/proton-bridge/v3/internal/services/telemetry" "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents" "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity" @@ -102,6 +103,8 @@ func New( imapServerManager imapservice.IMAPServerManager, smtpServerManager smtp.ServerManager, eventSubscription events.Subscription, + syncService syncservice.Regulator, + syncConfigDir string, ) (*User, error) { user, err := newImpl( ctx, @@ -117,6 +120,8 @@ func New( imapServerManager, smtpServerManager, eventSubscription, + syncService, + syncConfigDir, ) if err != nil { // Cleanup any pending resources on error @@ -145,9 +150,27 @@ func newImpl( imapServerManager imapservice.IMAPServerManager, smtpServerManager smtp.ServerManager, eventSubscription events.Subscription, + syncService syncservice.Regulator, + syncConfigDir string, ) (*User, error) { logrus.WithField("userID", apiUser.ID).Info("Creating new user") + // Migrate Sync Status from Vault. + { + syncStatus := encVault.SyncStatus() + + migrated, err := imapservice.MigrateVaultSettings(syncConfigDir, apiUser.ID, syncStatus.HasLabels, syncStatus.HasMessages, syncStatus.FailedMessageIDs) + if err != nil { + return nil, fmt.Errorf("failed to migrate user sync settings: %w", err) + } + + if migrated { + if err := encVault.ClearSyncStatus(); err != nil { + return nil, fmt.Errorf("failed to clear sync settings from vault: %w", err) + } + } + } + // Get the user's API addresses. apiAddrs, err := client.GetAddresses(ctx) if err != nil { @@ -238,7 +261,6 @@ func newImpl( client, identityState.Clone(), user, - encVault, user.eventService, imapServerManager, user, @@ -250,6 +272,7 @@ func newImpl( reporter, addressMode, eventSubscription, + syncConfigDir, user.maxSyncMemory, showAllMail, ) @@ -299,7 +322,7 @@ func newImpl( } // Start IMAP Service - if err := user.imapService.Start(ctx, user.serviceGroup); err != nil { + if err := user.imapService.Start(ctx, user.serviceGroup, syncService); err != nil { return user, fmt.Errorf("failed to start imap service: %w", err) } diff --git a/internal/user/user_test.go b/internal/user/user_test.go index 6169447b..330fb791 100644 --- a/internal/user/user_test.go +++ b/internal/user/user_test.go @@ -156,6 +156,8 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma nullIMAPServerManager, nullSMTPServerManager, nullEventSubscription, + nil, + "", ) require.NoError(tb, err) defer user.Close()