From 7a192d50db2532e7e8662d62043100a0328b4112 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Thu, 31 Aug 2023 10:47:47 +0200 Subject: [PATCH] feat(GODT-2891): Allow message create & delete during sync Incoming messages which arrive into labels we know during sync are now presented to the IMAP clients. We also allow messages to be deleted while syncing if deleted on other clients. Other operations such as moving, marking messages as read and label operations need to be considered in a follow up patch as they are far more complex. --- internal/bridge/sync_test.go | 75 +++++++++++++++++ internal/services/imapservice/service.go | 9 +- .../imapservice/service_message_events.go | 13 ++- .../imapservice/service_sync_events.go | 82 +++++++++++++++++++ 4 files changed, 172 insertions(+), 7 deletions(-) create mode 100644 internal/services/imapservice/service_sync_events.go diff --git a/internal/bridge/sync_test.go b/internal/bridge/sync_test.go index 24849089..1d613bf3 100644 --- a/internal/bridge/sync_test.go +++ b/internal/bridge/sync_test.go @@ -504,6 +504,81 @@ func TestBridge_EventReplayAfterSyncHasFinished(t *testing.T) { }, server.WithTLS(false)) } +func TestBridge_MessageCreateDuringSync(t *testing.T) { + numMsg := 1 << 8 + + withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) { + userID, addrID, err := s.CreateUser("imap", password) + require.NoError(t, err) + + labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder) + require.NoError(t, err) + + withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) { + createNumMessages(ctx, t, c, addrID, labelID, numMsg) + }) + + var allowSyncToProgress atomic.Bool + allowSyncToProgress.Store(false) + + // Simulate 429 to prevent sync from progressing. + s.AddStatusHook(func(request *http.Request) (int, bool) { + if request.Method == "GET" && strings.Contains(request.URL.Path, "/mail/v4/messages/") { + if !allowSyncToProgress.Load() { + return http.StatusTooManyRequests, true + } + } + + return 0, false + }) + + // The initial user should be fully synced. + withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) { + syncStartedCh, syncStartedDone := chToType[events.Event, events.SyncStarted](bridge.GetEvents(events.SyncStarted{})) + defer syncStartedDone() + + addressCreatedCh, addressCreatedDone := chToType[events.Event, events.UserAddressCreated](bridge.GetEvents(events.UserAddressCreated{})) + defer addressCreatedDone() + + userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil) + require.NoError(t, err) + + require.Equal(t, userID, (<-syncStartedCh).UserID) + + // create 20 more messages and move them to inbox + withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) { + createNumMessages(ctx, t, c, addrID, proton.InboxLabel, 20) + }) + + // User AddrID2 event as a check point to see when the new address was created. + addrID, err := s.CreateAddress(userID, "bar@proton.ch", password) + require.NoError(t, err) + + // At most two events can be published, one for the first address, then for the second. + // if the second event is not `addrID` then something went wrong. + event := <-addressCreatedCh + require.Equal(t, addrID, event.AddressID) + allowSyncToProgress.Store(true) + + info, err := bridge.GetUserInfo(userID) + require.NoError(t, err) + + client, err := eventuallyDial(fmt.Sprintf("%v:%v", constants.Host, bridge.GetIMAPPort())) + require.NoError(t, err) + require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass))) + defer func() { _ = client.Logout() }() + + require.Eventually(t, func() bool { + // Finally check if the 20 messages are in INBOX. + status, err := client.Status("INBOX", []imap.StatusItem{imap.StatusMessages}) + require.NoError(t, err) + + return uint32(20) == status.Messages + }, 10*time.Second, time.Second) + }) + }, server.WithTLS(false)) +} + func withClient(ctx context.Context, t *testing.T, s *server.Server, username string, password []byte, fn func(context.Context, *proton.Client)) { //nolint:unparam m := proton.New( proton.WithHostURL(s.GetHostURL()), diff --git a/internal/services/imapservice/service.go b/internal/services/imapservice/service.go index 18498073..d7c2fba6 100644 --- a/internal/services/imapservice/service.go +++ b/internal/services/imapservice/service.go @@ -324,6 +324,8 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo MessageHandler: s, } + syncEventHandler := s.newSyncEventHandler() + s.eventProvider.Subscribe(s.subscription) defer s.eventProvider.Unsubscribe(s.subscription) @@ -441,12 +443,13 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo } e.Consume(func(event proton.Event) error { if s.isSyncing { + if err := syncEventHandler.OnEvent(ctx, event); err != nil { + return err + } + // We need to reset the sync if we receive a refresh event during a sync and update // the last event id to avoid problems. if event.Refresh&proton.RefreshMail != 0 { - if err := s.HandleRefreshEvent(ctx, 0); err != nil { - return err - } s.lastHandledEventID = event.EventID } diff --git a/internal/services/imapservice/service_message_events.go b/internal/services/imapservice/service_message_events.go index dfda43a4..6a6cd0af 100644 --- a/internal/services/imapservice/service_message_events.go +++ b/internal/services/imapservice/service_message_events.go @@ -44,7 +44,7 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa switch event.Action { case proton.EventCreate: - updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message) + updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message, false) if err != nil { reportError(s.reporter, s.log, "Failed to apply create message event", err) return fmt.Errorf("failed to handle create message event: %w", err) @@ -93,7 +93,7 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) { s.log.WithError(err).Error("Failed to handle update message event in gluon, will try creating it") - updates, err := onMessageCreated(ctx, s, event.Message) + updates, err := onMessageCreated(ctx, s, event.Message, false) if err != nil { return fmt.Errorf("failed to handle update message event as create: %w", err) } @@ -121,7 +121,12 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa return nil } -func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMetadata) ([]imap.Update, error) { +func onMessageCreated( + ctx context.Context, + s *Service, + message proton.MessageMetadata, + allowUnknownLabels bool, +) ([]imap.Update, error) { s.log.WithFields(logrus.Fields{ "messageID": message.ID, "subject": logging.Sensitive(message.Subject), @@ -161,7 +166,7 @@ func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMet s.log.WithError(err).Error("Failed to remove failed message ID from vault") } - update = imap.NewMessagesCreated(false, res.update) + update = imap.NewMessagesCreated(allowUnknownLabels, res.update) didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update) if err != nil { return err diff --git a/internal/services/imapservice/service_sync_events.go b/internal/services/imapservice/service_sync_events.go new file mode 100644 index 00000000..fa376f1f --- /dev/null +++ b/internal/services/imapservice/service_sync_events.go @@ -0,0 +1,82 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package imapservice + +import ( + "context" + "fmt" + + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/proton-bridge/v3/internal/logging" + "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents" +) + +func (s *Service) newSyncEventHandler() userevents.EventHandler { + return userevents.EventHandler{ + RefreshHandler: s, + AddressHandler: s, + UserHandler: s, + LabelHandler: nil, + MessageHandler: &syncMessageEventHandler{service: s}, + UsedSpaceHandler: nil, + UserSettingsHandler: nil, + } +} + +type syncMessageEventHandler struct { + service *Service +} + +func (s syncMessageEventHandler) HandleMessageEvents(ctx context.Context, events []proton.MessageEvent) error { + s.service.log.Debug("handling message events (sync)") + for _, event := range events { + //nolint:exhaustive + switch event.Action { + case proton.EventCreate: + updates, err := onMessageCreated( + logging.WithLogrusField(ctx, "action", "create message (sync)"), + s.service, + event.Message, + true, + ) + if err != nil { + reportError(s.service.reporter, s.service.log, "Failed to apply create message event", err) + return fmt.Errorf("failed to handle create message event: %w", err) + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + + case proton.EventDelete: + updates := onMessageDeleted( + logging.WithLogrusField(ctx, "action", "delete message (sync)"), + s.service, + event, + ) + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return fmt.Errorf("failed to handle delete message event in gluon: %w", err) + } + default: + continue + } + } + + return nil +}