mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-17 23:56:56 +00:00
feat(BRIDGE-288): extended sync message update handler; observability tweaks; gluon bump;
This commit is contained in:
@ -25,7 +25,7 @@ import (
|
||||
|
||||
const (
|
||||
messageEventErrorCaseSchemaName = "bridge_event_loop_message_event_failures_total"
|
||||
messageEventErrorCaseSchemaVersion = 1
|
||||
messageEventErrorCaseSchemaVersion = 2
|
||||
)
|
||||
|
||||
func generateMessageEventFailureObservabilityMetric(eventType string) proton.ObservabilityMetric {
|
||||
@ -54,12 +54,12 @@ func GenerateMessageEventFailureUpdateMetric() proton.ObservabilityMetric {
|
||||
return generateMessageEventFailureObservabilityMetric("updateEvent")
|
||||
}
|
||||
|
||||
func GenerateMessageEventFailedToBuildMessage() proton.ObservabilityMetric {
|
||||
return generateMessageEventFailureObservabilityMetric("failedToBuildMessage")
|
||||
func GenerateMessageEventFailureUpdateDraftMetric() proton.ObservabilityMetric {
|
||||
return generateMessageEventFailureObservabilityMetric("updateEventDraft")
|
||||
}
|
||||
|
||||
func GenerateMessageEventFailedToBuildDraft() proton.ObservabilityMetric {
|
||||
return generateMessageEventFailureObservabilityMetric("failedToBuildDraft")
|
||||
func GenerateMessageEventFailureUpdateCreateMetric() proton.ObservabilityMetric {
|
||||
return generateMessageEventFailureObservabilityMetric("updateEventCreation")
|
||||
}
|
||||
|
||||
func GenerateMessageEventUpdateChannelDoesNotExist() proton.ObservabilityMetric {
|
||||
|
||||
@ -25,7 +25,7 @@ import (
|
||||
|
||||
const (
|
||||
syncEventErrorCaseSchemaName = "bridge_sync_message_event_failures_total"
|
||||
syncEventErrorCaseSchemaVersion = 1
|
||||
syncEventErrorCaseSchemaVersion = 2
|
||||
)
|
||||
|
||||
func generateSyncEventFailureObservabilityMetric(eventType string) proton.ObservabilityMetric {
|
||||
@ -49,3 +49,19 @@ func GenerateSyncFailureCreateMessageEventMetric() proton.ObservabilityMetric {
|
||||
func GenerateSyncFailureDeleteMessageEventMetric() proton.ObservabilityMetric {
|
||||
return generateSyncEventFailureObservabilityMetric("deleteMessageEvent")
|
||||
}
|
||||
|
||||
func GenerateSyncFailureUpdateMessageEventMetric() proton.ObservabilityMetric {
|
||||
return generateSyncEventFailureObservabilityMetric("updateMessageEvent")
|
||||
}
|
||||
|
||||
func GenerateSyncFailureUpdateMessageDraftEventMetric() proton.ObservabilityMetric {
|
||||
return generateSyncEventFailureObservabilityMetric("updateMessageDraftEvent")
|
||||
}
|
||||
|
||||
func GenerateSyncFailureUpdateMessageCreateEventMetric() proton.ObservabilityMetric {
|
||||
return generateSyncEventFailureObservabilityMetric("updateMessageEventCreationFailed")
|
||||
}
|
||||
|
||||
func GenerateSyncFailureMessageUpdateChannelDoesNotExist() proton.ObservabilityMetric {
|
||||
return generateSyncEventFailureObservabilityMetric("messageUpdateChannelDoesNotExist")
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ import (
|
||||
"github.com/ProtonMail/gopenpgp/v2/crypto"
|
||||
"github.com/ProtonMail/proton-bridge/v3/internal/logging"
|
||||
obsMetrics "github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice/observabilitymetrics/evtloopmsgevents"
|
||||
obsMetricsSynchronization "github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice/observabilitymetrics/syncmsgevents"
|
||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
|
||||
"github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
|
||||
"github.com/sirupsen/logrus"
|
||||
@ -44,9 +45,18 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa
|
||||
|
||||
switch event.Action {
|
||||
case proton.EventCreate:
|
||||
updates, err := onMessageCreated(logging.WithLogrusField(ctx, "action", "create message"), s, event.Message, false)
|
||||
updates, err := onMessageCreated(
|
||||
logging.WithLogrusField(ctx, "action", "create message"),
|
||||
s,
|
||||
event.Message,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailureCreateMessageMetric())
|
||||
s.observabilitySender.AddDistinctMetrics(
|
||||
observability.EventLoopError,
|
||||
obsMetrics.GenerateMessageEventFailureCreateMessageMetric(),
|
||||
)
|
||||
return fmt.Errorf("failed to handle create message event: %w", err)
|
||||
}
|
||||
|
||||
@ -62,9 +72,14 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa
|
||||
logging.WithLogrusField(ctx, "action", "update draft or sent message"),
|
||||
s,
|
||||
event,
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailureUpdateMetric())
|
||||
s.observabilitySender.AddDistinctMetrics(
|
||||
observability.EventLoopError,
|
||||
obsMetrics.GenerateMessageEventFailureUpdateMetric(),
|
||||
obsMetrics.GenerateMessageEventFailureUpdateDraftMetric(),
|
||||
)
|
||||
return fmt.Errorf("failed to handle update draft event: %w", err)
|
||||
}
|
||||
|
||||
@ -83,9 +98,13 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa
|
||||
logging.WithLogrusField(ctx, "action", "update message"),
|
||||
s,
|
||||
event.Message,
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailureUpdateMetric())
|
||||
s.observabilitySender.AddDistinctMetrics(
|
||||
observability.EventLoopError,
|
||||
obsMetrics.GenerateMessageEventFailureUpdateMetric(),
|
||||
)
|
||||
return fmt.Errorf("failed to handle update message event: %w", err)
|
||||
}
|
||||
|
||||
@ -93,8 +112,13 @@ func (s *Service) HandleMessageEvents(ctx context.Context, events []proton.Messa
|
||||
if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) {
|
||||
s.log.WithError(err).Error("Failed to handle update message event in gluon, will try creating it")
|
||||
|
||||
updates, err := onMessageCreated(ctx, s, event.Message, false)
|
||||
updates, err := onMessageCreated(ctx, s, event.Message, false, false)
|
||||
if err != nil {
|
||||
s.observabilitySender.AddDistinctMetrics(
|
||||
observability.EventLoopError,
|
||||
obsMetrics.GenerateMessageEventFailureUpdateMetric(),
|
||||
obsMetrics.GenerateMessageEventFailureUpdateCreateMetric(),
|
||||
)
|
||||
return fmt.Errorf("failed to handle update message event as create: %w", err)
|
||||
}
|
||||
|
||||
@ -126,7 +150,7 @@ func onMessageCreated(
|
||||
ctx context.Context,
|
||||
s *Service,
|
||||
message proton.MessageMetadata,
|
||||
allowUnknownLabels bool,
|
||||
allowUnknownLabels, duringSync bool,
|
||||
) ([]imap.Update, error) {
|
||||
s.log.WithFields(logrus.Fields{
|
||||
"messageID": message.ID,
|
||||
@ -159,7 +183,6 @@ func onMessageCreated(
|
||||
s.log.WithError(err).Error("Failed to add failed message ID to vault")
|
||||
}
|
||||
|
||||
s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailedToBuildMessage())
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -168,7 +191,7 @@ func onMessageCreated(
|
||||
}
|
||||
|
||||
update = imap.NewMessagesCreated(allowUnknownLabels, res.update)
|
||||
didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update)
|
||||
didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update, duringSync)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -189,7 +212,7 @@ func onMessageCreated(
|
||||
return []imap.Update{update}, nil
|
||||
}
|
||||
|
||||
func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.MessageEvent) ([]imap.Update, error) {
|
||||
func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.MessageEvent, duringSync bool) ([]imap.Update, error) {
|
||||
s.log.WithFields(logrus.Fields{
|
||||
"messageID": event.ID,
|
||||
"subject": logging.Sensitive(event.Message.Subject),
|
||||
@ -221,7 +244,6 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me
|
||||
s.log.WithError(err).Error("Failed to add failed message ID to vault")
|
||||
}
|
||||
|
||||
s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventFailedToBuildDraft())
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -235,9 +257,10 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me
|
||||
res.update.MailboxIDs,
|
||||
res.update.ParsedMessage,
|
||||
true, // Is the message doesn't exist, silently create it.
|
||||
false,
|
||||
)
|
||||
|
||||
didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update)
|
||||
didPublish, err := safePublishMessageUpdate(ctx, s, full.AddressID, update, duringSync)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -258,7 +281,7 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me
|
||||
return []imap.Update{update}, nil
|
||||
}
|
||||
|
||||
func onMessageUpdate(ctx context.Context, s *Service, message proton.MessageMetadata) ([]imap.Update, error) {
|
||||
func onMessageUpdate(ctx context.Context, s *Service, message proton.MessageMetadata, duringSync bool) ([]imap.Update, error) {
|
||||
s.log.WithFields(logrus.Fields{
|
||||
"messageID": message.ID,
|
||||
"subject": logging.Sensitive(message.Subject),
|
||||
@ -272,7 +295,7 @@ func onMessageUpdate(ctx context.Context, s *Service, message proton.MessageMeta
|
||||
flags,
|
||||
)
|
||||
|
||||
didPublish, err := safePublishMessageUpdate(ctx, s, message.AddressID, update)
|
||||
didPublish, err := safePublishMessageUpdate(ctx, s, message.AddressID, update, duringSync)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -303,7 +326,7 @@ func onMessageDeleted(ctx context.Context, s *Service, event proton.MessageEvent
|
||||
// create/update message.
|
||||
// If the user is in combined mode, we simply push the update to the primary address. If the user is in split mode
|
||||
// we do not publish the update as the address no longer exists.
|
||||
func safePublishMessageUpdate(ctx context.Context, s *Service, addressID string, update imap.Update) (bool, error) {
|
||||
func safePublishMessageUpdate(ctx context.Context, s *Service, addressID string, update imap.Update, duringSync bool) (bool, error) {
|
||||
v, ok := s.connectors[addressID]
|
||||
if !ok {
|
||||
if s.addressMode == usertypes.AddressModeCombined {
|
||||
@ -322,8 +345,11 @@ func safePublishMessageUpdate(ctx context.Context, s *Service, addressID string,
|
||||
}
|
||||
|
||||
logrus.Warnf("Update channel not found for address %v, it may have been already deleted", addressID)
|
||||
s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventUpdateChannelDoesNotExist())
|
||||
|
||||
if duringSync {
|
||||
s.observabilitySender.AddDistinctMetrics(observability.SyncError, obsMetricsSynchronization.GenerateSyncFailureMessageUpdateChannelDoesNotExist())
|
||||
} else {
|
||||
s.observabilitySender.AddDistinctMetrics(observability.EventLoopError, obsMetrics.GenerateMessageEventUpdateChannelDoesNotExist())
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
||||
@ -21,11 +21,13 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ProtonMail/gluon"
|
||||
"github.com/ProtonMail/go-proton-api"
|
||||
"github.com/ProtonMail/proton-bridge/v3/internal/logging"
|
||||
obsMetrics "github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice/observabilitymetrics/syncmsgevents"
|
||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
|
||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/userevents"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (s *Service) newSyncEventHandler() userevents.EventHandler {
|
||||
@ -55,9 +57,13 @@ func (s syncMessageEventHandler) HandleMessageEvents(ctx context.Context, events
|
||||
s.service,
|
||||
event.Message,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
s.service.observabilitySender.AddDistinctMetrics(observability.SyncError, obsMetrics.GenerateSyncFailureCreateMessageEventMetric())
|
||||
s.service.observabilitySender.AddDistinctMetrics(
|
||||
observability.SyncError,
|
||||
obsMetrics.GenerateSyncFailureCreateMessageEventMetric(),
|
||||
)
|
||||
return fmt.Errorf("failed to handle create message event: %w", err)
|
||||
}
|
||||
|
||||
@ -65,20 +71,63 @@ func (s syncMessageEventHandler) HandleMessageEvents(ctx context.Context, events
|
||||
return err
|
||||
}
|
||||
|
||||
case proton.EventUpdate:
|
||||
case proton.EventUpdate, proton.EventUpdateFlags:
|
||||
if event.Message.IsDraft() || (event.Message.Flags&proton.MessageFlagSent != 0) {
|
||||
updates, err := onMessageUpdateDraftOrSent(
|
||||
logging.WithLogrusField(ctx, "action", "update draft or sent message (sync)"),
|
||||
s.service,
|
||||
event,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
s.service.observabilitySender.AddDistinctMetrics(
|
||||
observability.SyncError,
|
||||
obsMetrics.GenerateSyncFailureUpdateMessageEventMetric(),
|
||||
obsMetrics.GenerateSyncFailureUpdateMessageDraftEventMetric(),
|
||||
)
|
||||
return fmt.Errorf("failed to handle update draft event (sync): %w", err)
|
||||
}
|
||||
|
||||
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
updates, err := onMessageUpdate(
|
||||
logging.WithLogrusField(ctx, "action", "update message (sync)"),
|
||||
s.service,
|
||||
event.Message,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
s.service.observabilitySender.AddDistinctMetrics(
|
||||
observability.SyncError,
|
||||
obsMetrics.GenerateSyncFailureUpdateMessageEventMetric(),
|
||||
)
|
||||
return fmt.Errorf("failed to handle update message event (sync): %w", err)
|
||||
}
|
||||
|
||||
// If the update fails on the gluon side because it doesn't exist, we try to create the message instead.
|
||||
if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) {
|
||||
logrus.WithError(err).Error("Failed to handle update message event in gluon, will try creating it (sync)")
|
||||
|
||||
updates, err := onMessageCreated(ctx, s.service, event.Message, false, true)
|
||||
if err != nil {
|
||||
s.service.observabilitySender.AddDistinctMetrics(
|
||||
observability.SyncError,
|
||||
obsMetrics.GenerateSyncFailureUpdateMessageEventMetric(),
|
||||
obsMetrics.GenerateSyncFailureUpdateMessageCreateEventMetric(),
|
||||
)
|
||||
return fmt.Errorf("failed to handle update message event as create (sync): %w", err)
|
||||
}
|
||||
|
||||
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case proton.EventDelete:
|
||||
|
||||
Reference in New Issue
Block a user