From 93c7552a417cf48579b875ef529fc022a276e0c3 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Fri, 13 Jan 2023 15:24:09 +0100 Subject: [PATCH] GODT-2202: Report update errors from Gluon For every update sent to gluon wait and check the error code to see if an error occurred. Note: Updates can't be inspect on the call site as it can lead to deadlocks. --- go.mod | 2 +- go.sum | 4 +- internal/user/events.go | 170 ++++++++++++++++++++++++---------- internal/user/sync.go | 53 ++++++----- internal/user/sync_flusher.go | 15 ++- internal/user/user.go | 12 --- internal/user/user_test.go | 4 +- 7 files changed, 169 insertions(+), 91 deletions(-) diff --git a/go.mod b/go.mod index 1b47f3ad..5166e91d 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( github.com/0xAX/notificator v0.0.0-20220220101646-ee9b8921e557 github.com/Masterminds/semver/v3 v3.1.1 - github.com/ProtonMail/gluon v0.14.2-0.20230112101229-07a5a074643e + github.com/ProtonMail/gluon v0.14.2-0.20230113145313-7dc070e73340 github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a github.com/ProtonMail/go-proton-api v0.2.4-0.20230112102613-6ad201cdb337 github.com/ProtonMail/go-rfc5322 v0.11.0 diff --git a/go.sum b/go.sum index 06d3d26a..82a0fd20 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo= github.com/ProtonMail/docker-credential-helpers v1.1.0 h1:+kvUIpwWcbtP3WFv5sSvkFn/XLzSqPOB5AAthuk9xPk= github.com/ProtonMail/docker-credential-helpers v1.1.0/go.mod h1:mK0aBveCxhnQ756AmaTfXMZDeULvheYVhF/MWMErN5g= -github.com/ProtonMail/gluon v0.14.2-0.20230112101229-07a5a074643e h1:3UfVqUoBAh8R+FO+F+l0XkXerO6v0lD8waMFfwkdP/c= -github.com/ProtonMail/gluon v0.14.2-0.20230112101229-07a5a074643e/go.mod h1:z2AxLIiBCT1K+0OBHyaDI7AEaO5qI6/BEC2TE42vs4Q= +github.com/ProtonMail/gluon v0.14.2-0.20230113145313-7dc070e73340 h1:NrE0XbpppwSPRDbhK0LMoyIkE/+89Nj83MF9jg/f0X8= +github.com/ProtonMail/gluon v0.14.2-0.20230113145313-7dc070e73340/go.mod h1:z2AxLIiBCT1K+0OBHyaDI7AEaO5qI6/BEC2TE42vs4Q= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a h1:D+aZah+k14Gn6kmL7eKxoo/4Dr/lK3ChBcwce2+SQP4= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a/go.mod h1:oTGdE7/DlWIr23G0IKW3OXK9wZ5Hw1GGiaJFccTvZi4= github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo= diff --git a/internal/user/events.go b/internal/user/events.go index 363ecec6..707a83b4 100644 --- a/internal/user/events.go +++ b/internal/user/events.go @@ -294,27 +294,43 @@ func (user *User) handleLabelEvents(ctx context.Context, labelEvents []proton.La for _, event := range labelEvents { switch event.Action { case proton.EventCreate: - if err := user.handleCreateLabelEvent(ctx, event); err != nil { + updates, err := user.handleCreateLabelEvent(ctx, event) + if err != nil { return fmt.Errorf("failed to handle create label event: %w", err) } + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + case proton.EventUpdate, proton.EventUpdateFlags: - if err := user.handleUpdateLabelEvent(ctx, event); err != nil { + updates, err := user.handleUpdateLabelEvent(ctx, event) + if err != nil { return fmt.Errorf("failed to handle update label event: %w", err) } + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + case proton.EventDelete: - if err := user.handleDeleteLabelEvent(ctx, event); err != nil { + updates, err := user.handleDeleteLabelEvent(ctx, event) + if err != nil { return fmt.Errorf("failed to handle delete label event: %w", err) } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } } } return nil } -func (user *User) handleCreateLabelEvent(_ context.Context, event proton.LabelEvent) error { //nolint:unparam - return safe.LockRet(func() error { +func (user *User) handleCreateLabelEvent(ctx context.Context, event proton.LabelEvent) ([]imap.Update, error) { //nolint:unparam + return safe.LockRetErr(func() ([]imap.Update, error) { + var updates []imap.Update user.log.WithFields(logrus.Fields{ "labelID": event.ID, "name": logging.Sensitive(event.Label.Name), @@ -325,7 +341,9 @@ func (user *User) handleCreateLabelEvent(_ context.Context, event proton.LabelEv } for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) { - updateCh.Enqueue(newMailboxCreatedUpdate(imap.MailboxID(event.ID), getMailboxName(event.Label))) + update := newMailboxCreatedUpdate(imap.MailboxID(event.ID), getMailboxName(event.Label)) + updateCh.Enqueue(update) + updates = append(updates, update) } user.eventCh.Enqueue(events.UserLabelCreated{ @@ -334,12 +352,13 @@ func (user *User) handleCreateLabelEvent(_ context.Context, event proton.LabelEv Name: event.Label.Name, }) - return nil + return updates, nil }, user.apiLabelsLock, user.updateChLock) } -func (user *User) handleUpdateLabelEvent(_ context.Context, event proton.LabelEvent) error { //nolint:unparam - return safe.LockRet(func() error { +func (user *User) handleUpdateLabelEvent(ctx context.Context, event proton.LabelEvent) ([]imap.Update, error) { //nolint:unparam + return safe.LockRetErr(func() ([]imap.Update, error) { + var updates []imap.Update user.log.WithFields(logrus.Fields{ "labelID": event.ID, "name": logging.Sensitive(event.Label.Name), @@ -350,10 +369,12 @@ func (user *User) handleUpdateLabelEvent(_ context.Context, event proton.LabelEv } for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) { - updateCh.Enqueue(imap.NewMailboxUpdated( + update := imap.NewMailboxUpdated( imap.MailboxID(event.ID), getMailboxName(event.Label), - )) + ) + updateCh.Enqueue(update) + updates = append(updates, update) } user.eventCh.Enqueue(events.UserLabelUpdated{ @@ -362,12 +383,14 @@ func (user *User) handleUpdateLabelEvent(_ context.Context, event proton.LabelEv Name: event.Label.Name, }) - return nil + return updates, nil }, user.apiLabelsLock, user.updateChLock) } -func (user *User) handleDeleteLabelEvent(_ context.Context, event proton.LabelEvent) error { //nolint:unparam - return safe.LockRet(func() error { +func (user *User) handleDeleteLabelEvent(ctx context.Context, event proton.LabelEvent) ([]imap.Update, error) { //nolint:unparam + return safe.LockRetErr(func() ([]imap.Update, error) { + var updates []imap.Update + user.log.WithField("labelID", event.ID).Info("Handling label deleted event") label, ok := user.apiLabels[event.ID] @@ -376,7 +399,9 @@ func (user *User) handleDeleteLabelEvent(_ context.Context, event proton.LabelEv } for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) { - updateCh.Enqueue(imap.NewMailboxDeleted(imap.MailboxID(event.ID))) + update := imap.NewMailboxDeleted(imap.MailboxID(event.ID)) + updateCh.Enqueue(update) + updates = append(updates, update) } user.eventCh.Enqueue(events.UserLabelDeleted{ @@ -385,7 +410,7 @@ func (user *User) handleDeleteLabelEvent(_ context.Context, event proton.LabelEv Name: label.Name, }) - return nil + return updates, nil }, user.apiLabelsLock, user.updateChLock) } @@ -396,26 +421,32 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto switch event.Action { case proton.EventCreate: - if err := user.handleCreateMessageEvent( + updates, err := user.handleCreateMessageEvent( logging.WithLogrusField(ctx, "action", "create message"), - event, - ); err != nil { + event) + if err != nil { if rerr := user.reporter.ReportMessageWithContext("Failed to apply create message event", reporter.Context{ "error": err, }); rerr != nil { user.log.WithError(err).Error("Failed to report create message event error") } + return fmt.Errorf("failed to handle create message event: %w", err) } + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + case proton.EventUpdate, proton.EventUpdateFlags: // Draft update means to completely remove old message and upload the new data again, but we should // only do this if the event is of type EventUpdate otherwise label switch operations will not work. if event.Message.IsDraft() && event.Action == proton.EventUpdate { - if err := user.handleUpdateDraftEvent( + updates, err := user.handleUpdateDraftEvent( logging.WithLogrusField(ctx, "action", "update draft"), event, - ); err != nil { + ) + if err != nil { if rerr := user.reporter.ReportMessageWithContext("Failed to apply update draft message event", reporter.Context{ "error": err, }); rerr != nil { @@ -424,6 +455,10 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto return fmt.Errorf("failed to handle update draft event: %w", err) } + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + return nil } @@ -431,10 +466,11 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto // whether the flags, labels or read only data (header+body) has been changed. This requires fixing proton // first so that it correctly reports those cases. // Issue regular update to handle mailboxes and flag changes. - if err := user.handleUpdateMessageEvent( + updates, err := user.handleUpdateMessageEvent( logging.WithLogrusField(ctx, "action", "update message"), event, - ); err != nil { + ) + if err != nil { if rerr := user.reporter.ReportMessageWithContext("Failed to apply update message event", reporter.Context{ "error": err, }); rerr != nil { @@ -443,11 +479,16 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto return fmt.Errorf("failed to handle update message event: %w", err) } + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } + case proton.EventDelete: - if err := user.handleDeleteMessageEvent( + updates, err := user.handleDeleteMessageEvent( logging.WithLogrusField(ctx, "action", "delete message"), event, - ); err != nil { + ) + if err != nil { if rerr := user.reporter.ReportMessageWithContext("Failed to apply delete message event", reporter.Context{ "error": err, }); rerr != nil { @@ -455,25 +496,30 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []proto } return fmt.Errorf("failed to handle delete message event: %w", err) } + + if err := waitOnIMAPUpdates(ctx, updates); err != nil { + return err + } } } return nil } -func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.MessageEvent) error { +func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) { full, err := user.client.GetFullMessage(ctx, event.Message.ID) if err != nil { - return fmt.Errorf("failed to get full message: %w", err) + return nil, fmt.Errorf("failed to get full message: %w", err) } - return safe.RLockRet(func() error { + return safe.RLockRetErr(func() ([]imap.Update, error) { user.log.WithFields(logrus.Fields{ "messageID": event.ID, "subject": logging.Sensitive(event.Message.Subject), }).Info("Handling message created event") - return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error { + var update imap.Update + if err := withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error { res := buildRFC822(user.apiLabels, full, addrKR) if res.err != nil { @@ -497,45 +543,56 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.Mes user.log.WithError(err).Error("Failed to remove failed message ID from vault") } - user.updateCh[full.AddressID].Enqueue(imap.NewMessagesCreated(false, res.update)) + update = imap.NewMessagesCreated(false, res.update) + user.updateCh[full.AddressID].Enqueue(update) return nil - }) + }); err != nil { + return nil, err + } + + return []imap.Update{update}, nil }, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock) } -func (user *User) handleUpdateMessageEvent(ctx context.Context, event proton.MessageEvent) error { //nolint:unparam - return safe.RLockRet(func() error { +func (user *User) handleUpdateMessageEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) { //nolint:unparam + return safe.RLockRetErr(func() ([]imap.Update, error) { user.log.WithFields(logrus.Fields{ "messageID": event.ID, "subject": logging.Sensitive(event.Message.Subject), }).Info("Handling message updated event") - user.updateCh[event.Message.AddressID].Enqueue(imap.NewMessageMailboxesUpdated( + update := imap.NewMessageMailboxesUpdated( imap.MessageID(event.ID), mapTo[string, imap.MailboxID](wantLabels(user.apiLabels, event.Message.LabelIDs)), event.Message.Seen(), event.Message.Starred(), - )) + ) - return nil + user.updateCh[event.Message.AddressID].Enqueue(update) + + return []imap.Update{update}, nil }, user.apiLabelsLock, user.updateChLock) } -func (user *User) handleDeleteMessageEvent(ctx context.Context, event proton.MessageEvent) error { //nolint:unparam - return safe.RLockRet(func() error { +func (user *User) handleDeleteMessageEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) { //nolint:unparam + return safe.RLockRetErr(func() ([]imap.Update, error) { user.log.WithField("messageID", event.ID).Info("Handling message deleted event") + var updates []imap.Update + for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) { - updateCh.Enqueue(imap.NewMessagesDeleted(imap.MessageID(event.ID))) + update := imap.NewMessagesDeleted(imap.MessageID(event.ID)) + updateCh.Enqueue(update) + updates = append(updates, update) } - return nil + return updates, nil }, user.updateChLock) } -func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.MessageEvent) error { //nolint:unparam - return safe.RLockRet(func() error { +func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.MessageEvent) ([]imap.Update, error) { //nolint:unparam + return safe.RLockRetErr(func() ([]imap.Update, error) { user.log.WithFields(logrus.Fields{ "messageID": event.ID, "subject": logging.Sensitive(event.Message.Subject), @@ -543,10 +600,12 @@ func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.Messa full, err := user.client.GetFullMessage(ctx, event.Message.ID) if err != nil { - return fmt.Errorf("failed to get full draft: %w", err) + return nil, fmt.Errorf("failed to get full draft: %w", err) } - return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error { + var update imap.Update + + if err := withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error { res := buildRFC822(user.apiLabels, full, addrKR) if res.err != nil { @@ -570,15 +629,21 @@ func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.Messa user.log.WithError(err).Error("Failed to remove failed message ID from vault") } - user.updateCh[full.AddressID].Enqueue(imap.NewMessageUpdated( + update = imap.NewMessageUpdated( res.update.Message, res.update.Literal, res.update.MailboxIDs, res.update.ParsedMessage, - )) + ) + + user.updateCh[full.AddressID].Enqueue(update) return nil - }) + }); err != nil { + return nil, err + } + + return []imap.Update{update}, nil }, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock) } @@ -602,3 +667,14 @@ func getMailboxName(label proton.Label) []string { return name } + +func waitOnIMAPUpdates(ctx context.Context, updates []imap.Update) error { + for _, update := range updates { + err, ok := update.WaitContext(ctx) + if ok && err != nil { + return fmt.Errorf("failed to apply gluon update %v :%w", update.String(), err) + } + } + + return nil +} diff --git a/internal/user/sync.go b/internal/user/sync.go index 44f92834..6e4faeea 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -164,10 +164,14 @@ func (user *User) sync(ctx context.Context) error { // nolint:exhaustive func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh ...*queue.QueuedChannel[imap.Update]) error { + var updates []imap.Update + // Create placeholder Folders/Labels mailboxes with a random ID and with the \Noselect attribute. for _, prefix := range []string{folderPrefix, labelPrefix} { for _, updateCh := range updateCh { - updateCh.Enqueue(newPlaceHolderMailboxCreatedUpdate(prefix)) + update := newPlaceHolderMailboxCreatedUpdate(prefix) + updateCh.Enqueue(update) + updates = append(updates, update) } } @@ -180,12 +184,16 @@ func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh switch label.Type { case proton.LabelTypeSystem: for _, updateCh := range updateCh { - updateCh.Enqueue(newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name)) + update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name) + updateCh.Enqueue(update) + updates = append(updates, update) } case proton.LabelTypeFolder, proton.LabelTypeLabel: for _, updateCh := range updateCh { - updateCh.Enqueue(newMailboxCreatedUpdate(imap.MailboxID(labelID), getMailboxName(label))) + update := newMailboxCreatedUpdate(imap.MailboxID(labelID), getMailboxName(label)) + updateCh.Enqueue(update) + updates = append(updates, update) } default: @@ -194,11 +202,11 @@ func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh } // Wait for all label updates to be applied. - for _, updateCh := range updateCh { - update := imap.NewNoop() - defer update.WaitContext(ctx) - - updateCh.Enqueue(update) + for _, update := range updates { + err, ok := update.WaitContext(ctx) + if ok && err != nil { + return fmt.Errorf("failed to apply label create update in gluon %v: %w", update.String(), err) + } } return nil @@ -243,9 +251,9 @@ func syncMessages( defer syncReporter.done() type flushUpdate struct { - messageID string - noOps []*imap.Noop - batchLen int + messageID string + pushedUpdates []imap.Update + batchLen int } // The higher this value, the longer we can continue our download iteration before being blocked on channel writes @@ -322,29 +330,26 @@ func syncMessages( flushers[res.addressID].push(res.update) } + var pushedUpdates []imap.Update for _, flusher := range flushers { flusher.flush() - } - - noopUpdates := make([]*imap.Noop, len(updateCh)) - index := 0 - for _, updateCh := range updateCh { - noopUpdates[index] = imap.NewNoop() - updateCh.Enqueue(noopUpdates[index]) - index++ + pushedUpdates = append(pushedUpdates, flusher.collectPushedUpdates()...) } flushUpdateCh <- flushUpdate{ - messageID: batch[0].messageID, - noOps: noopUpdates, - batchLen: len(batch), + messageID: batch[0].messageID, + pushedUpdates: pushedUpdates, + batchLen: len(batch), } } }() for flushUpdate := range flushUpdateCh { - for _, up := range flushUpdate.noOps { - up.WaitContext(ctx) + for _, up := range flushUpdate.pushedUpdates { + err, ok := up.WaitContext(ctx) + if ok && err != nil { + return fmt.Errorf("failed to apply sync update to gluon %v: %w", up.String(), err) + } } if err := vault.SetLastMessageID(flushUpdate.messageID); err != nil { diff --git a/internal/user/sync_flusher.go b/internal/user/sync_flusher.go index 8333b6de..0c0dd2dd 100644 --- a/internal/user/sync_flusher.go +++ b/internal/user/sync_flusher.go @@ -23,8 +23,9 @@ import ( ) type flusher struct { - updateCh *queue.QueuedChannel[imap.Update] - updates []*imap.MessageCreated + updateCh *queue.QueuedChannel[imap.Update] + updates []*imap.MessageCreated + pushedUpdates []imap.Update maxUpdateSize int curChunkSize int @@ -47,8 +48,16 @@ func (f *flusher) push(update *imap.MessageCreated) { func (f *flusher) flush() { if len(f.updates) > 0 { - f.updateCh.Enqueue(imap.NewMessagesCreated(true, f.updates...)) + update := imap.NewMessagesCreated(true, f.updates...) + f.updateCh.Enqueue(update) f.updates = nil f.curChunkSize = 0 + f.pushedUpdates = append(f.pushedUpdates, update) } } + +func (f *flusher) collectPushedUpdates() []imap.Update { + updates := f.pushedUpdates + f.pushedUpdates = nil + return updates +} diff --git a/internal/user/user.go b/internal/user/user.go index c943fa6e..3c89b904 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -565,18 +565,6 @@ func (user *User) doEventPoll(ctx context.Context) error { user.log.WithField("event", event).Debug("Handled API event") - // Wait for all events to be applied. - safe.RLock(func() { - for _, updateCh := range xslices.Unique(maps.Values(user.updateCh)) { - update := imap.NewNoop() - defer update.WaitContext(ctx) - - updateCh.Enqueue(update) - } - }, user.updateChLock) - - user.log.WithField("event", event).Debug("All events applied to gluon") - // Update the event ID in the vault. if err := user.vault.SetEventID(event.EventID); err != nil { return fmt.Errorf("failed to update event ID: %w", err) diff --git a/internal/user/user_test.go b/internal/user/user_test.go index 787d5910..3319f7ad 100644 --- a/internal/user/user_test.go +++ b/internal/user/user_test.go @@ -110,7 +110,7 @@ func TestUser_AddressMode(t *testing.T) { for _, imapConn := range imapConn { go func(imapConn connector.Connector) { for update := range imapConn.GetUpdates() { - update.Done() + update.Done(nil) } }(imapConn) } @@ -226,7 +226,7 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma for _, imapConn := range imapConn { go func(imapConn connector.Connector) { for update := range imapConn.GetUpdates() { - update.Done() + update.Done(nil) } }(imapConn) }