mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-17 15:46:44 +00:00
feat(GODT-2366): Handle failed message updates as creates
This handles the following case: - event says message was created - we try to fetch the message but API says the doesn’t exist yet — we skip applying the “message created” update - event then says message was updated at some point in the future - we try to handle it but fail because we don’t have the message — we should treat it as a creation
This commit is contained in:
@ -23,6 +23,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/ProtonMail/gluon"
|
||||||
"github.com/ProtonMail/gluon/imap"
|
"github.com/ProtonMail/gluon/imap"
|
||||||
"github.com/ProtonMail/gluon/queue"
|
"github.com/ProtonMail/gluon/queue"
|
||||||
"github.com/ProtonMail/gluon/reporter"
|
"github.com/ProtonMail/gluon/reporter"
|
||||||
@ -415,9 +416,7 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
|
|||||||
|
|
||||||
switch event.Action {
|
switch event.Action {
|
||||||
case proton.EventCreate:
|
case proton.EventCreate:
|
||||||
updates, err := user.handleCreateMessageEvent(
|
updates, err := user.handleCreateMessageEvent(logging.WithLogrusField(ctx, "action", "create message"), event.Message)
|
||||||
logging.WithLogrusField(ctx, "action", "create message"),
|
|
||||||
event)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if rerr := user.reporter.ReportMessageWithContext("Failed to apply create message event", reporter.Context{
|
if rerr := user.reporter.ReportMessageWithContext("Failed to apply create message event", reporter.Context{
|
||||||
"error": err,
|
"error": err,
|
||||||
@ -446,6 +445,7 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
|
|||||||
}); rerr != nil {
|
}); rerr != nil {
|
||||||
user.log.WithError(err).Error("Failed to report update draft message event error")
|
user.log.WithError(err).Error("Failed to report update draft message event error")
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("failed to handle update draft event: %w", err)
|
return fmt.Errorf("failed to handle update draft event: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -462,7 +462,7 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
|
|||||||
// Issue regular update to handle mailboxes and flag changes.
|
// Issue regular update to handle mailboxes and flag changes.
|
||||||
updates, err := user.handleUpdateMessageEvent(
|
updates, err := user.handleUpdateMessageEvent(
|
||||||
logging.WithLogrusField(ctx, "action", "update message"),
|
logging.WithLogrusField(ctx, "action", "update message"),
|
||||||
event,
|
event.Message,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if rerr := user.reporter.ReportMessageWithContext("Failed to apply update message event", reporter.Context{
|
if rerr := user.reporter.ReportMessageWithContext("Failed to apply update message event", reporter.Context{
|
||||||
@ -470,12 +470,25 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
|
|||||||
}); rerr != nil {
|
}); rerr != nil {
|
||||||
user.log.WithError(err).Error("Failed to report update message event error")
|
user.log.WithError(err).Error("Failed to report update message event error")
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("failed to handle update message event: %w", err)
|
return fmt.Errorf("failed to handle update message event: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the update fails on the gluon side because it doesn't exist, we try to create the message instead.
|
||||||
|
if err := waitOnIMAPUpdates(ctx, updates); gluon.IsNoSuchMessage(err) {
|
||||||
|
user.log.WithError(err).Error("Failed to handle update message event in gluon, will try creating it")
|
||||||
|
|
||||||
|
updates, err := user.handleCreateMessageEvent(ctx, event.Message)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to handle update message event as create: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
|
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
case proton.EventDelete:
|
case proton.EventDelete:
|
||||||
updates, err := user.handleDeleteMessageEvent(
|
updates, err := user.handleDeleteMessageEvent(
|
||||||
@ -488,6 +501,7 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
|
|||||||
}); rerr != nil {
|
}); rerr != nil {
|
||||||
user.log.WithError(err).Error("Failed to report delete message event error")
|
user.log.WithError(err).Error("Failed to report delete message event error")
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("failed to handle delete message event: %w", err)
|
return fmt.Errorf("failed to handle delete message event: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -500,17 +514,17 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) {
|
func (user *User) handleCreateMessageEvent(ctx context.Context, message proton.MessageMetadata) ([]imap.Update, error) {
|
||||||
user.log.WithFields(logrus.Fields{
|
user.log.WithFields(logrus.Fields{
|
||||||
"messageID": event.ID,
|
"messageID": message.ID,
|
||||||
"subject": logging.Sensitive(event.Message.Subject),
|
"subject": logging.Sensitive(message.Subject),
|
||||||
}).Info("Handling message created event")
|
}).Info("Handling message created event")
|
||||||
|
|
||||||
full, err := user.client.GetFullMessage(ctx, event.Message.ID)
|
full, err := user.client.GetFullMessage(ctx, message.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If the message is not found, it means that it has been deleted before we could fetch it.
|
// If the message is not found, it means that it has been deleted before we could fetch it.
|
||||||
if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status == http.StatusUnprocessableEntity {
|
if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status == http.StatusUnprocessableEntity {
|
||||||
user.log.WithField("messageID", event.Message.ID).Warn("Cannot create new message: full message is missing on API")
|
user.log.WithField("messageID", message.ID).Warn("Cannot create new message: full message is missing on API")
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -520,13 +534,13 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.Mes
|
|||||||
return safe.RLockRetErr(func() ([]imap.Update, error) {
|
return safe.RLockRetErr(func() ([]imap.Update, error) {
|
||||||
var update imap.Update
|
var update imap.Update
|
||||||
|
|
||||||
if err := withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error {
|
if err := withAddrKR(user.apiUser, user.apiAddrs[message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error {
|
||||||
res := buildRFC822(user.apiLabels, full, addrKR)
|
res := buildRFC822(user.apiLabels, full, addrKR)
|
||||||
|
|
||||||
if res.err != nil {
|
if res.err != nil {
|
||||||
user.log.WithError(err).Error("Failed to build RFC822 message")
|
user.log.WithError(err).Error("Failed to build RFC822 message")
|
||||||
|
|
||||||
if err := user.vault.AddFailedMessageID(event.ID); err != nil {
|
if err := user.vault.AddFailedMessageID(message.ID); err != nil {
|
||||||
user.log.WithError(err).Error("Failed to add failed message ID to vault")
|
user.log.WithError(err).Error("Failed to add failed message ID to vault")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -540,7 +554,7 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.Mes
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := user.vault.RemFailedMessageID(event.ID); err != nil {
|
if err := user.vault.RemFailedMessageID(message.ID); err != nil {
|
||||||
user.log.WithError(err).Error("Failed to remove failed message ID from vault")
|
user.log.WithError(err).Error("Failed to remove failed message ID from vault")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -556,21 +570,21 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.Mes
|
|||||||
}, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock)
|
}, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (user *User) handleUpdateMessageEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) { //nolint:unparam
|
func (user *User) handleUpdateMessageEvent(ctx context.Context, message proton.MessageMetadata) ([]imap.Update, error) { //nolint:unparam
|
||||||
return safe.RLockRetErr(func() ([]imap.Update, error) {
|
return safe.RLockRetErr(func() ([]imap.Update, error) {
|
||||||
user.log.WithFields(logrus.Fields{
|
user.log.WithFields(logrus.Fields{
|
||||||
"messageID": event.ID,
|
"messageID": message.ID,
|
||||||
"subject": logging.Sensitive(event.Message.Subject),
|
"subject": logging.Sensitive(message.Subject),
|
||||||
}).Info("Handling message updated event")
|
}).Info("Handling message updated event")
|
||||||
|
|
||||||
update := imap.NewMessageMailboxesUpdated(
|
update := imap.NewMessageMailboxesUpdated(
|
||||||
imap.MessageID(event.ID),
|
imap.MessageID(message.ID),
|
||||||
mapTo[string, imap.MailboxID](wantLabels(user.apiLabels, event.Message.LabelIDs)),
|
mapTo[string, imap.MailboxID](wantLabels(user.apiLabels, message.LabelIDs)),
|
||||||
event.Message.Seen(),
|
message.Seen(),
|
||||||
event.Message.Starred(),
|
message.Starred(),
|
||||||
)
|
)
|
||||||
|
|
||||||
user.updateCh[event.Message.AddressID].Enqueue(update)
|
user.updateCh[message.AddressID].Enqueue(update)
|
||||||
|
|
||||||
return []imap.Update{update}, nil
|
return []imap.Update{update}, nil
|
||||||
}, user.apiLabelsLock, user.updateChLock)
|
}, user.apiLabelsLock, user.updateChLock)
|
||||||
|
|||||||
Reference in New Issue
Block a user