diff --git a/go.mod b/go.mod index 99aa4b10..723d7322 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.21.9 require ( github.com/0xAX/notificator v0.0.0-20220220101646-ee9b8921e557 github.com/Masterminds/semver/v3 v3.2.0 - github.com/ProtonMail/gluon v0.17.1-0.20241121121545-aa1cfd19b4b2 + github.com/ProtonMail/gluon v0.17.1-0.20250116113909-2ebd96ec0bc2 github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a github.com/ProtonMail/go-proton-api v0.4.1-0.20240918100656-b4860af56d47 github.com/ProtonMail/gopenpgp/v2 v2.7.4-proton diff --git a/go.sum b/go.sum index 9d5e7a20..ae931fb7 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,10 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo= github.com/ProtonMail/gluon v0.17.1-0.20241121121545-aa1cfd19b4b2 h1:iZjKvjb6VkGb52ZaBBiXC1MGYJN4C/S97JfppdzpMHQ= github.com/ProtonMail/gluon v0.17.1-0.20241121121545-aa1cfd19b4b2/go.mod h1:0/c03TzZPNiSgY5UDJK1iRDkjlDPwWugxTT6et2qDu8= +github.com/ProtonMail/gluon v0.17.1-0.20250110112112-749d4bc44766 h1:2LiufnjGxlne+zh2J3os/InRPvnoygvSZBR7pFGqAqg= +github.com/ProtonMail/gluon v0.17.1-0.20250110112112-749d4bc44766/go.mod h1:0/c03TzZPNiSgY5UDJK1iRDkjlDPwWugxTT6et2qDu8= +github.com/ProtonMail/gluon v0.17.1-0.20250116113909-2ebd96ec0bc2 h1:lDgMidI/9j2eedavcy7YICv8+F73ooVTUoUGBE4dO0s= +github.com/ProtonMail/gluon v0.17.1-0.20250116113909-2ebd96ec0bc2/go.mod h1:0/c03TzZPNiSgY5UDJK1iRDkjlDPwWugxTT6et2qDu8= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a h1:D+aZah+k14Gn6kmL7eKxoo/4Dr/lK3ChBcwce2+SQP4= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a/go.mod h1:oTGdE7/DlWIr23G0IKW3OXK9wZ5Hw1GGiaJFccTvZi4= github.com/ProtonMail/go-crypto v0.0.0-20230321155629-9a39f2531310/go.mod h1:8TI4H3IbrackdNgv+92dI+rhpCaLqM0IfpgCgenFvRE= diff --git a/internal/services/imapservice/observabilitymetrics/evtloopmsgevents/metrics.go b/internal/services/imapservice/observabilitymetrics/evtloopmsgevents/metrics.go index b0ae4147..31ead4c2 100644 --- a/internal/services/imapservice/observabilitymetrics/evtloopmsgevents/metrics.go +++ b/internal/services/imapservice/observabilitymetrics/evtloopmsgevents/metrics.go @@ -25,7 +25,7 @@ import ( const ( messageEventErrorCaseSchemaName = "bridge_event_loop_message_event_failures_total" - messageEventErrorCaseSchemaVersion = 1 + messageEventErrorCaseSchemaVersion = 2 ) func generateMessageEventFailureObservabilityMetric(eventType string) proton.ObservabilityMetric { @@ -54,12 +54,12 @@ func GenerateMessageEventFailureUpdateMetric() proton.ObservabilityMetric { return generateMessageEventFailureObservabilityMetric("updateEvent") } -func GenerateMessageEventFailedToBuildMessage() proton.ObservabilityMetric { - return generateMessageEventFailureObservabilityMetric("failedToBuildMessage") +func GenerateMessageEventFailureUpdateDraftMetric() proton.ObservabilityMetric { + return generateMessageEventFailureObservabilityMetric("updateEventDraft") } -func GenerateMessageEventFailedToBuildDraft() proton.ObservabilityMetric { - return generateMessageEventFailureObservabilityMetric("failedToBuildDraft") +func GenerateMessageEventFailureUpdateCreateMetric() proton.ObservabilityMetric { + return generateMessageEventFailureObservabilityMetric("updateEventCreation") } func GenerateMessageEventUpdateChannelDoesNotExist() proton.ObservabilityMetric { diff --git a/internal/services/imapservice/observabilitymetrics/syncmsgevents/metrics.go b/internal/services/imapservice/observabilitymetrics/syncmsgevents/metrics.go index 25ad75f7..b9df6c6b 100644 --- a/internal/services/imapservice/observabilitymetrics/syncmsgevents/metrics.go +++ b/internal/services/imapservice/observabilitymetrics/syncmsgevents/metrics.go @@ -25,7 +25,7 @@ import ( const ( syncEventErrorCaseSchemaName = "bridge_sync_message_event_failures_total" - syncEventErrorCaseSchemaVersion = 1 + syncEventErrorCaseSchemaVersion = 2 ) func generateSyncEventFailureObservabilityMetric(eventType string) proton.ObservabilityMetric { @@ -49,3 +49,19 @@ func GenerateSyncFailureCreateMessageEventMetric() proton.ObservabilityMetric { func GenerateSyncFailureDeleteMessageEventMetric() proton.ObservabilityMetric { return generateSyncEventFailureObservabilityMetric("deleteMessageEvent") } + +func GenerateSyncFailureUpdateMessageEventMetric() proton.ObservabilityMetric { + return generateSyncEventFailureObservabilityMetric("updateMessageEvent") +} + +func GenerateSyncFailureUpdateMessageDraftEventMetric() proton.ObservabilityMetric { + return generateSyncEventFailureObservabilityMetric("updateMessageDraftEvent") +} + +func GenerateSyncFailureUpdateMessageCreateEventMetric() proton.ObservabilityMetric { + return generateSyncEventFailureObservabilityMetric("updateMessageEventCreationFailed") +} + +func GenerateSyncFailureMessageUpdateChannelDoesNotExist() proton.ObservabilityMetric { + return generateSyncEventFailureObservabilityMetric("messageUpdateChannelDoesNotExist") +} diff --git a/internal/services/imapservice/service_message_events.go b/internal/services/imapservice/service_message_events.go index 62198b6d..d7b498a2 100644 --- a/internal/services/imapservice/service_message_events.go +++ b/internal/services/imapservice/service_message_events.go @@ -30,6 +30,7 @@ import ( "github.com/ProtonMail/gopenpgp/v2/crypto" "github.com/ProtonMail/proton-bridge/v3/internal/logging" obsMetrics "github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice/observabilitymetrics/evtloopmsgevents" + obsMetricsSynchronization "github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice/observabilitymetrics/syncmsgevents" "github.com/ProtonMail/proton-bridge/v3/internal/services/observability" "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" "github.com/sirupsen/logrus" @@ -44,9 +45,18 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa switch event.Action { case proton.EventCreate: - updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message, false) + updates, err := onMessageCreated( + logging.WithLogrusField(ctx, "action", "create message"), + s, + event.Message, + false, + false, + ) if err != nil { - s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailureCreateMessageMetric()) + s.observabilitySender.AddDistinctMetrics( + observability.EventLoopError, + obsMetrics.GenerateMessageEventFailureCreateMessageMetric(), + ) return fmt.Errorf("failed to handle create message event: %w", err) } @@ -62,9 +72,14 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa logging.WithLogrusField(ctx, "action", "update draft or sent message"), s, event, + false, ) if err != nil { - s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailureUpdateMetric()) + s.observabilitySender.AddDistinctMetrics( + observability.EventLoopError, + obsMetrics.GenerateMessageEventFailureUpdateMetric(), + obsMetrics.GenerateMessageEventFailureUpdateDraftMetric(), + ) return fmt.Errorf("failed to handle update draft event: %w", err) } @@ -83,9 +98,13 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa logging.WithLogrusField(ctx, "action", "update message"), s, event.Message, + false, ) if err != nil { - s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailureUpdateMetric()) + s.observabilitySender.AddDistinctMetrics( + observability.EventLoopError, + obsMetrics.GenerateMessageEventFailureUpdateMetric(), + ) return fmt.Errorf("failed to handle update message event: %w", err) } @@ -93,8 +112,13 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) { s.log.WithError(err).Error("Failed to handle update message event in gluon, will try creating it") - updates, err := onMessageCreated(ctx, s, event.Message, false) + updates, err := onMessageCreated(ctx, s, event.Message, false, false) if err != nil { + s.observabilitySender.AddDistinctMetrics( + observability.EventLoopError, + obsMetrics.GenerateMessageEventFailureUpdateMetric(), + obsMetrics.GenerateMessageEventFailureUpdateCreateMetric(), + ) return fmt.Errorf("failed to handle update message event as create: %w", err) } @@ -126,7 +150,7 @@ func onMessageCreated( ctx context.Context, s *Service, message proton.MessageMetadata, - allowUnknownLabels bool, + allowUnknownLabels, duringSync bool, ) ([]imap.Update, error) { s.log.WithFields(logrus.Fields{ "messageID": message.ID, @@ -159,7 +183,6 @@ func onMessageCreated( s.log.WithError(err).Error("Failed to add failed message ID to vault") } - s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailedToBuildMessage()) return nil } @@ -168,7 +191,7 @@ func onMessageCreated( } update = imap.NewMessagesCreated(allowUnknownLabels, res.update) - didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update) + didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update, duringSync) if err != nil { return err } @@ -189,7 +212,7 @@ func onMessageCreated( return []imap.Update{update}, nil } -func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.MessageEvent) ([]imap.Update, error) { +func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.MessageEvent, duringSync bool) ([]imap.Update, error) { s.log.WithFields(logrus.Fields{ "messageID": event.ID, "subject": logging.Sensitive(event.Message.Subject), @@ -221,7 +244,6 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me s.log.WithError(err).Error("Failed to add failed message ID to vault") } - s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailedToBuildDraft()) return nil } @@ -235,9 +257,10 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me res.update.MailboxIDs, res.update.ParsedMessage, true, // Is the message doesn't exist, silently create it. + false, ) - didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update) + didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update, duringSync) if err != nil { return err } @@ -258,7 +281,7 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me return []imap.Update{update}, nil } -func onMessageUpdate(ctx context.Context, s *Service, message proton.MessageMetadata) ([]imap.Update, error) { +func onMessageUpdate(ctx context.Context, s *Service, message proton.MessageMetadata, duringSync bool) ([]imap.Update, error) { s.log.WithFields(logrus.Fields{ "messageID": message.ID, "subject": logging.Sensitive(message.Subject), @@ -272,7 +295,7 @@ func onMessageUpdate(ctx context.Context, s *Service, message proton.MessageMeta flags, ) - didPublish, err := safePublishMessageUpdate(ctx, s, message.AddressID, update) + didPublish, err := safePublishMessageUpdate(ctx, s, message.AddressID, update, duringSync) if err != nil { return nil, err } @@ -303,7 +326,7 @@ func onMessageDeleted(ctx context.Context, s *Service, event proton.MessageEvent // create/update message. // If the user is in combined mode, we simply push the update to the primary address. If the user is in split mode // we do not publish the update as the address no longer exists. -func safePublishMessageUpdate(ctx context.Context, s *Service, addressID string, update imap.Update) (bool, error) { +func safePublishMessageUpdate(ctx context.Context, s *Service, addressID string, update imap.Update, duringSync bool) (bool, error) { v, ok := s.connectors[addressID] if !ok { if s.addressMode == usertypes.AddressModeCombined { @@ -322,8 +345,11 @@ func safePublishMessageUpdate(ctx context.Context, s *Service, addressID string, } logrus.Warnf("Update channel not found for address %v, it may have been already deleted", addressID) - s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventUpdateChannelDoesNotExist()) - + if duringSync { + s.observabilitySender.AddDistinctMetrics(observability.SyncError, obsMetricsSynchronization.GenerateSyncFailureMessageUpdateChannelDoesNotExist()) + } else { + s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventUpdateChannelDoesNotExist()) + } return false, nil } diff --git a/internal/services/imapservice/service_sync_events.go b/internal/services/imapservice/service_sync_events.go index 92fd2600..e3974b2a 100644 --- a/internal/services/imapservice/service_sync_events.go +++ b/internal/services/imapservice/service_sync_events.go @@ -21,11 +21,13 @@ import ( "context" "fmt" + "github.com/ProtonMail/gluon" "github.com/ProtonMail/go-proton-api" "github.com/ProtonMail/proton-bridge/v3/internal/logging" obsMetrics "github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice/observabilitymetrics/syncmsgevents" "github.com/ProtonMail/proton-bridge/v3/internal/services/observability" "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents" + "github.com/sirupsen/logrus" ) func (s *Service) newSyncEventHandler() userevents.EventHandler { @@ -55,9 +57,13 @@ func (s syncMessageEventHandler) HandleMessageEvents(ctx context.Context, events s.service, event.Message, true, + true, ) if err != nil { - s.service.observabilitySender.AddDistinctMetrics(observability.SyncError, obsMetrics.GenerateSyncFailureCreateMessageEventMetric()) + s.service.observabilitySender.AddDistinctMetrics( + observability.SyncError, + obsMetrics.GenerateSyncFailureCreateMessageEventMetric(), + ) return fmt.Errorf("failed to handle create message event: %w", err) } @@ -65,20 +71,63 @@ func (s syncMessageEventHandler) HandleMessageEvents(ctx context.Context, events return err } - case proton.EventUpdate: + case proton.EventUpdate, proton.EventUpdateFlags: if event.Message.IsDraft() || (event.Message.Flags&proton.MessageFlagSent != 0) { updates, err := onMessageUpdateDraftOrSent( logging.WithLogrusField(ctx, "action", "update draft or sent message (sync)"), s.service, event, + true, ) if err != nil { + s.service.observabilitySender.AddDistinctMetrics( + observability.SyncError, + obsMetrics.GenerateSyncFailureUpdateMessageEventMetric(), + obsMetrics.GenerateSyncFailureUpdateMessageDraftEventMetric(), + ) return fmt.Errorf("failed to handle update draft event (sync): %w", err) } if err := waitOnIMAPUpdates(ctx, updates); err != nil { return err } + + continue + } + + updates, err := onMessageUpdate( + logging.WithLogrusField(ctx, "action", "update message (sync)"), + s.service, + event.Message, + true, + ) + if err != nil { + s.service.observabilitySender.AddDistinctMetrics( + observability.SyncError, + obsMetrics.GenerateSyncFailureUpdateMessageEventMetric(), + ) + return fmt.Errorf("failed to handle update message event (sync): %w", err) + } + + // If the update fails on the gluon side because it doesn't exist, we try to create the message instead. + if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) { + logrus.WithError(err).Error("Failed to handle update message event in gluon, will try creating it (sync)") + + updates, err := onMessageCreated(ctx, s.service, event.Message, false, true) + if err != nil { + s.service.observabilitySender.AddDistinctMetrics( + observability.SyncError, + obsMetrics.GenerateSyncFailureUpdateMessageEventMetric(), + obsMetrics.GenerateSyncFailureUpdateMessageCreateEventMetric(), + ) + return fmt.Errorf("failed to handle update message event as create (sync): %w", err) + } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + } else if err != nil { + return err } case proton.EventDelete: diff --git a/tests/observability_test.go b/tests/observability_test.go index 6bbd783f..57aaa495 100644 --- a/tests/observability_test.go +++ b/tests/observability_test.go @@ -74,6 +74,10 @@ func (s *scenario) syncFailureMessageEventsObservability(username string) error Metrics: []proton.ObservabilityMetric{ syncmsgevents.GenerateSyncFailureCreateMessageEventMetric(), syncmsgevents.GenerateSyncFailureDeleteMessageEventMetric(), + syncmsgevents.GenerateSyncFailureUpdateMessageEventMetric(), + syncmsgevents.GenerateSyncFailureUpdateMessageDraftEventMetric(), + syncmsgevents.GenerateSyncFailureUpdateMessageCreateEventMetric(), + syncmsgevents.GenerateSyncFailureMessageUpdateChannelDoesNotExist(), }, } return s.t.withClientPass(context.Background(), username, s.t.getUserByName(username).userPass, func(ctx context.Context, c *proton.Client) error { @@ -86,11 +90,11 @@ func (s *scenario) syncFailureMessageEventsObservability(username string) error func (s *scenario) eventLoopFailureMessageEventsObservability(username string) error { batch := proton.ObservabilityBatch{ Metrics: []proton.ObservabilityMetric{ - evtloopmsgevents.GenerateMessageEventFailedToBuildDraft(), - evtloopmsgevents.GenerateMessageEventFailedToBuildMessage(), evtloopmsgevents.GenerateMessageEventFailureCreateMessageMetric(), evtloopmsgevents.GenerateMessageEventFailureDeleteMessageMetric(), evtloopmsgevents.GenerateMessageEventFailureUpdateMetric(), + evtloopmsgevents.GenerateMessageEventFailureUpdateDraftMetric(), + evtloopmsgevents.GenerateMessageEventFailureUpdateCreateMetric(), evtloopmsgevents.GenerateMessageEventUpdateChannelDoesNotExist(), }, }