diff --git a/internal/bridge/sync_test.go b/internal/bridge/sync_test.go index 24849089..1d613bf3 100644 --- a/internal/bridge/sync_test.go +++ b/internal/bridge/sync_test.go @@ -504,6 +504,81 @@ func TestBridge_EventReplayAfterSyncHasFinished(t *testing.T) { }, server.WithTLS(false)) } +func TestBridge_MessageCreateDuringSync(t *testing.T) { + numMsg := 1 << 8 + + withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) { + userID, addrID, err := s.CreateUser("imap", password) + require.NoError(t, err) + + labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder) + require.NoError(t, err) + + withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) { + createNumMessages(ctx, t, c, addrID, labelID, numMsg) + }) + + var allowSyncToProgress atomic.Bool + allowSyncToProgress.Store(false) + + // Simulate 429 to prevent sync from progressing. + s.AddStatusHook(func(request *http.Request) (int, bool) { + if request.Method == "GET" && strings.Contains(request.URL.Path, "/mail/v4/messages/") { + if !allowSyncToProgress.Load() { + return http.StatusTooManyRequests, true + } + } + + return 0, false + }) + + // The initial user should be fully synced. + withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) { + syncStartedCh, syncStartedDone := chToType[events.Event, events.SyncStarted](bridge.GetEvents(events.SyncStarted{})) + defer syncStartedDone() + + addressCreatedCh, addressCreatedDone := chToType[events.Event, events.UserAddressCreated](bridge.GetEvents(events.UserAddressCreated{})) + defer addressCreatedDone() + + userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil) + require.NoError(t, err) + + require.Equal(t, userID, (<-syncStartedCh).UserID) + + // create 20 more messages and move them to inbox + withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) { + createNumMessages(ctx, t, c, addrID, proton.InboxLabel, 20) + }) + + // User AddrID2 event as a check point to see when the new address was created. + addrID, err := s.CreateAddress(userID, "bar@proton.ch", password) + require.NoError(t, err) + + // At most two events can be published, one for the first address, then for the second. + // if the second event is not `addrID` then something went wrong. + event := <-addressCreatedCh + require.Equal(t, addrID, event.AddressID) + allowSyncToProgress.Store(true) + + info, err := bridge.GetUserInfo(userID) + require.NoError(t, err) + + client, err := eventuallyDial(fmt.Sprintf("%v:%v", constants.Host, bridge.GetIMAPPort())) + require.NoError(t, err) + require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass))) + defer func() { _ = client.Logout() }() + + require.Eventually(t, func() bool { + // Finally check if the 20 messages are in INBOX. + status, err := client.Status("INBOX", []imap.StatusItem{imap.StatusMessages}) + require.NoError(t, err) + + return uint32(20) == status.Messages + }, 10*time.Second, time.Second) + }) + }, server.WithTLS(false)) +} + func withClient(ctx context.Context, t *testing.T, s *server.Server, username string, password []byte, fn func(context.Context, *proton.Client)) { //nolint:unparam m := proton.New( proton.WithHostURL(s.GetHostURL()), diff --git a/internal/services/imapservice/service.go b/internal/services/imapservice/service.go index 18498073..d7c2fba6 100644 --- a/internal/services/imapservice/service.go +++ b/internal/services/imapservice/service.go @@ -324,6 +324,8 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo MessageHandler: s, } + syncEventHandler := s.newSyncEventHandler() + s.eventProvider.Subscribe(s.subscription) defer s.eventProvider.Unsubscribe(s.subscription) @@ -441,12 +443,13 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo } e.Consume(func(event proton.Event) error { if s.isSyncing { + if err := syncEventHandler.OnEvent(ctx, event); err != nil { + return err + } + // We need to reset the sync if we receive a refresh event during a sync and update // the last event id to avoid problems. if event.Refresh&proton.RefreshMail != 0 { - if err := s.HandleRefreshEvent(ctx, 0); err != nil { - return err - } s.lastHandledEventID = event.EventID } diff --git a/internal/services/imapservice/service_message_events.go b/internal/services/imapservice/service_message_events.go index dfda43a4..6a6cd0af 100644 --- a/internal/services/imapservice/service_message_events.go +++ b/internal/services/imapservice/service_message_events.go @@ -44,7 +44,7 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa switch event.Action { case proton.EventCreate: - updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message) + updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message, false) if err != nil { reportError(s.reporter, s.log, "Failed to apply create message event", err) return fmt.Errorf("failed to handle create message event: %w", err) @@ -93,7 +93,7 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) { s.log.WithError(err).Error("Failed to handle update message event in gluon, will try creating it") - updates, err := onMessageCreated(ctx, s, event.Message) + updates, err := onMessageCreated(ctx, s, event.Message, false) if err != nil { return fmt.Errorf("failed to handle update message event as create: %w", err) } @@ -121,7 +121,12 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa return nil } -func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMetadata) ([]imap.Update, error) { +func onMessageCreated( + ctx context.Context, + s *Service, + message proton.MessageMetadata, + allowUnknownLabels bool, +) ([]imap.Update, error) { s.log.WithFields(logrus.Fields{ "messageID": message.ID, "subject": logging.Sensitive(message.Subject), @@ -161,7 +166,7 @@ func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMet s.log.WithError(err).Error("Failed to remove failed message ID from vault") } - update = imap.NewMessagesCreated(false, res.update) + update = imap.NewMessagesCreated(allowUnknownLabels, res.update) didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update) if err != nil { return err diff --git a/internal/services/imapservice/service_sync_events.go b/internal/services/imapservice/service_sync_events.go new file mode 100644 index 00000000..fa376f1f --- /dev/null +++ b/internal/services/imapservice/service_sync_events.go @@ -0,0 +1,82 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/proton-bridge/v3/internal/logging" + "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents" +) + +func (s *Service) newSyncEventHandler() userevents.EventHandler { + return userevents.EventHandler{ + RefreshHandler: s, + AddressHandler: s, + UserHandler: s, + LabelHandler: nil, + MessageHandler: &syncMessageEventHandler{service: s}, + UsedSpaceHandler: nil, + UserSettingsHandler: nil, + } +} + +type syncMessageEventHandler struct { + service *Service +} + +func (s syncMessageEventHandler) HandleMessageEvents(ctx context.Context, events []proton.MessageEvent) error { + s.service.log.Debug("handling message events (sync)") + for _, event := range events { + //nolint:exhaustive + switch event.Action { + case proton.EventCreate: + updates, err := onMessageCreated( + logging.WithLogrusField(ctx, "action", "create message (sync)"), + s.service, + event.Message, + true, + ) + if err != nil { + reportError(s.service.reporter, s.service.log, "Failed to apply create message event", err) + return fmt.Errorf("failed to handle create message event: %w", err) + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + + case proton.EventDelete: + updates := onMessageDeleted( + logging.WithLogrusField(ctx, "action", "delete message (sync)"), + s.service, + event, + ) + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return fmt.Errorf("failed to handle delete message event in gluon: %w", err) + } + default: + continue + } + } + + return nil +}