feat(GODT-2891): Allow message create & delete during sync

Incoming messages which arrive into labels we know during sync are now
presented to the IMAP clients.

We also allow messages to be deleted while syncing if deleted on other
clients.

Other operations such as moving, marking messages as read and label
operations need to be considered in a follow up patch as they are far
more complex.
This commit is contained in:
Leander Beernaert
2023-08-31 10:47:47 +02:00
parent a5a9bd762d
commit 7a192d50db
4 changed files with 172 additions and 7 deletions

View File

@ -504,6 +504,81 @@ func TestBridge_EventReplayAfterSyncHasFinished(t *testing.T) {
}, server.WithTLS(false))
}
func TestBridge_MessageCreateDuringSync(t *testing.T) {
numMsg := 1 << 8
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, storeKey []byte) {
userID, addrID, err := s.CreateUser("imap", password)
require.NoError(t, err)
labelID, err := s.CreateLabel(userID, "folder", "", proton.LabelTypeFolder)
require.NoError(t, err)
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, labelID, numMsg)
})
var allowSyncToProgress atomic.Bool
allowSyncToProgress.Store(false)
// Simulate 429 to prevent sync from progressing.
s.AddStatusHook(func(request *http.Request) (int, bool) {
if request.Method == "GET" && strings.Contains(request.URL.Path, "/mail/v4/messages/") {
if !allowSyncToProgress.Load() {
return http.StatusTooManyRequests, true
}
}
return 0, false
})
// The initial user should be fully synced.
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) {
syncStartedCh, syncStartedDone := chToType[events.Event, events.SyncStarted](bridge.GetEvents(events.SyncStarted{}))
defer syncStartedDone()
addressCreatedCh, addressCreatedDone := chToType[events.Event, events.UserAddressCreated](bridge.GetEvents(events.UserAddressCreated{}))
defer addressCreatedDone()
userID, err := bridge.LoginFull(ctx, "imap", password, nil, nil)
require.NoError(t, err)
require.Equal(t, userID, (<-syncStartedCh).UserID)
// create 20 more messages and move them to inbox
withClient(ctx, t, s, "imap", password, func(ctx context.Context, c *proton.Client) {
createNumMessages(ctx, t, c, addrID, proton.InboxLabel, 20)
})
// User AddrID2 event as a check point to see when the new address was created.
addrID, err := s.CreateAddress(userID, "bar@proton.ch", password)
require.NoError(t, err)
// At most two events can be published, one for the first address, then for the second.
// if the second event is not `addrID` then something went wrong.
event := <-addressCreatedCh
require.Equal(t, addrID, event.AddressID)
allowSyncToProgress.Store(true)
info, err := bridge.GetUserInfo(userID)
require.NoError(t, err)
client, err := eventuallyDial(fmt.Sprintf("%v:%v", constants.Host, bridge.GetIMAPPort()))
require.NoError(t, err)
require.NoError(t, client.Login(info.Addresses[0], string(info.BridgePass)))
defer func() { _ = client.Logout() }()
require.Eventually(t, func() bool {
// Finally check if the 20 messages are in INBOX.
status, err := client.Status("INBOX", []imap.StatusItem{imap.StatusMessages})
require.NoError(t, err)
return uint32(20) == status.Messages
}, 10*time.Second, time.Second)
})
}, server.WithTLS(false))
}
func withClient(ctx context.Context, t *testing.T, s *server.Server, username string, password []byte, fn func(context.Context, *proton.Client)) { //nolint:unparam
m := proton.New(
proton.WithHostURL(s.GetHostURL()),

View File

@ -324,6 +324,8 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
MessageHandler: s,
}
syncEventHandler := s.newSyncEventHandler()
s.eventProvider.Subscribe(s.subscription)
defer s.eventProvider.Unsubscribe(s.subscription)
@ -441,12 +443,13 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
}
e.Consume(func(event proton.Event) error {
if s.isSyncing {
if err := syncEventHandler.OnEvent(ctx, event); err != nil {
return err
}
// We need to reset the sync if we receive a refresh event during a sync and update
// the last event id to avoid problems.
if event.Refresh&proton.RefreshMail != 0 {
if err := s.HandleRefreshEvent(ctx, 0); err != nil {
return err
}
s.lastHandledEventID = event.EventID
}

View File

@ -44,7 +44,7 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa
switch event.Action {
case proton.EventCreate:
updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message)
updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message, false)
if err != nil {
reportError(s.reporter, s.log, "Failed to apply create message event", err)
return fmt.Errorf("failed to handle create message event: %w", err)
@ -93,7 +93,7 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa
if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) {
s.log.WithError(err).Error("Failed to handle update message event in gluon, will try creating it")
updates, err := onMessageCreated(ctx, s, event.Message)
updates, err := onMessageCreated(ctx, s, event.Message, false)
if err != nil {
return fmt.Errorf("failed to handle update message event as create: %w", err)
}
@ -121,7 +121,12 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa
return nil
}
func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMetadata) ([]imap.Update, error) {
func onMessageCreated(
ctx context.Context,
s *Service,
message proton.MessageMetadata,
allowUnknownLabels bool,
) ([]imap.Update, error) {
s.log.WithFields(logrus.Fields{
"messageID": message.ID,
"subject": logging.Sensitive(message.Subject),
@ -161,7 +166,7 @@ func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMet
s.log.WithError(err).Error("Failed to remove failed message ID from vault")
}
update = imap.NewMessagesCreated(false, res.update)
update = imap.NewMessagesCreated(allowUnknownLabels, res.update)
didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update)
if err != nil {
return err

View File

@ -0,0 +1,82 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"context"
"fmt"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/proton-bridge/v3/internal/logging"
"github.com/ProtonMail/proton-bridge/v3/internal/services/userevents"
)
func (s *Service) newSyncEventHandler() userevents.EventHandler {
return userevents.EventHandler{
RefreshHandler: s,
AddressHandler: s,
UserHandler: s,
LabelHandler: nil,
MessageHandler: &syncMessageEventHandler{service: s},
UsedSpaceHandler: nil,
UserSettingsHandler: nil,
}
}
type syncMessageEventHandler struct {
service *Service
}
func (s syncMessageEventHandler) HandleMessageEvents(ctx context.Context, events []proton.MessageEvent) error {
s.service.log.Debug("handling message events (sync)")
for _, event := range events {
//nolint:exhaustive
switch event.Action {
case proton.EventCreate:
updates, err := onMessageCreated(
logging.WithLogrusField(ctx, "action", "create message (sync)"),
s.service,
event.Message,
true,
)
if err != nil {
reportError(s.service.reporter, s.service.log, "Failed to apply create message event", err)
return fmt.Errorf("failed to handle create message event: %w", err)
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return err
}
case proton.EventDelete:
updates := onMessageDeleted(
logging.WithLogrusField(ctx, "action", "delete message (sync)"),
s.service,
event,
)
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return fmt.Errorf("failed to handle delete message event in gluon: %w", err)
}
default:
continue
}
}
return nil
}