diff --git a/internal/bridge/user.go b/internal/bridge/user.go index 3e34058c..b13868c3 100644 --- a/internal/bridge/user.go +++ b/internal/bridge/user.go @@ -441,9 +441,9 @@ func (bridge *Bridge) addUserWithVault( ctx, vault, client, + bridge.reporter, apiUser, bridge.crashHandler, - bridge.reporter, bridge.vault.SyncWorkers(), bridge.vault.GetShowAllMail(), ) diff --git a/internal/user/events.go b/internal/user/events.go index 41b47a1e..adcda965 100644 --- a/internal/user/events.go +++ b/internal/user/events.go @@ -438,12 +438,12 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.Mes }).Info("Handling message created event") return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error { - buildRes, err := buildRFC822(user.apiLabels, full, addrKR) + update, err := buildRFC822(user.apiLabels, full, addrKR).update.unpack() if err != nil { return fmt.Errorf("failed to build RFC822 message: %w", err) } - user.updateCh[full.AddressID].Enqueue(imap.NewMessagesCreated(buildRes.update)) + user.updateCh[full.AddressID].Enqueue(imap.NewMessagesCreated(update)) return nil }) @@ -493,16 +493,16 @@ func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.Messa } return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error { - buildRes, err := buildRFC822(user.apiLabels, full, addrKR) + update, err := buildRFC822(user.apiLabels, full, addrKR).update.unpack() if err != nil { return fmt.Errorf("failed to build RFC822 draft: %w", err) } user.updateCh[full.AddressID].Enqueue(imap.NewMessageUpdated( - buildRes.update.Message, - buildRes.update.Literal, - buildRes.update.MailboxIDs, - buildRes.update.ParsedMessage, + update.Message, + update.Literal, + update.MailboxIDs, + update.ParsedMessage, )) return nil diff --git a/internal/user/sync.go b/internal/user/sync.go index 611e23d2..173bbbf0 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -26,6 +26,7 @@ import ( "github.com/ProtonMail/gluon/imap" "github.com/ProtonMail/gluon/queue" + "github.com/ProtonMail/gluon/reporter" "github.com/ProtonMail/go-proton-api" "github.com/ProtonMail/gopenpgp/v2/crypto" "github.com/ProtonMail/proton-bridge/v3/internal/events" @@ -36,6 +37,7 @@ import ( "github.com/google/uuid" "github.com/sirupsen/logrus" "golang.org/x/exp/maps" + "golang.org/x/exp/slices" ) const ( @@ -87,6 +89,7 @@ func (user *User) doSync(ctx context.Context) error { return nil } +// nolint:funlen func (user *User) sync(ctx context.Context) error { return safe.RLockRet(func() error { return withAddrKRs(user.apiUser, user.apiAddrs, user.vault.KeyPass(), func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error { @@ -109,10 +112,32 @@ func (user *User) sync(ctx context.Context) error { if !user.vault.SyncStatus().HasMessages { user.log.Info("Syncing messages") + // Determine which messages to sync. + messageIDs, err := user.client.GetMessageIDs(ctx, "") + if err != nil { + return fmt.Errorf("failed to get message IDs to sync: %w", err) + } + + // Remove any messages that have already failed to sync. + messageIDs = xslices.Filter(messageIDs, func(messageID string) bool { + return !slices.Contains(user.vault.SyncStatus().FailedMessageIDs, messageID) + }) + + // Reverse the order of the message IDs so that the newest messages are synced first. + xslices.Reverse(messageIDs) + + // If we have a message ID that we've already synced, then we can skip all messages before it. + if idx := xslices.Index(messageIDs, user.vault.SyncStatus().LastMessageID); idx >= 0 { + messageIDs = messageIDs[idx+1:] + } + + // Sync the messages. if err := syncMessages( ctx, user.ID(), + messageIDs, user.client, + user.reporter, user.vault, user.apiLabels, addrKRs, @@ -183,7 +208,9 @@ func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh func syncMessages( ctx context.Context, userID string, + messageIDs []string, client *proton.Client, + sentry reporter.Reporter, vault *vault.User, apiLabels map[string]proton.Label, addrKRs map[string]*crypto.KeyRing, @@ -194,20 +221,6 @@ func syncMessages( ctx, cancel := context.WithCancel(ctx) defer cancel() - // Determine which messages to sync. - messageIDs, err := client.GetMessageIDs(ctx, "") - if err != nil { - return fmt.Errorf("failed to get message IDs to sync: %w", err) - } - - // Reverse the order of the message IDs so that the newest messages are synced first. - xslices.Reverse(messageIDs) - - // If we have a message ID that we've already synced, then we can skip all messages before it. - if idx := xslices.Index(messageIDs, vault.SyncStatus().LastMessageID); idx >= 0 { - messageIDs = messageIDs[idx+1:] - } - // Track the amount of time to process all the messages. syncStartTime := time.Now() defer func() { logrus.WithField("duration", time.Since(syncStartTime)).Info("Message sync completed") }() @@ -222,14 +235,12 @@ func syncMessages( flushers := make(map[string]*flusher, len(updateCh)) for addrID, updateCh := range updateCh { - flusher := newFlusher(updateCh, maxUpdateSize) - - flushers[addrID] = flusher + flushers[addrID] = newFlusher(updateCh, maxUpdateSize) } // Create a reporter to report sync progress updates. - reporter := newReporter(userID, eventCh, len(messageIDs), time.Second) - defer reporter.done() + syncReporter := newSyncReporter(userID, eventCh, len(messageIDs), time.Second) + defer syncReporter.done() type flushUpdate struct { messageID string @@ -267,7 +278,7 @@ func syncMessages( return nil, ctx.Err() } - return buildRFC822(apiLabels, msg, addrKRs[msg.AddressID]) + return buildRFC822(apiLabels, msg, addrKRs[msg.AddressID]), nil }) if err != nil { errorCh <- err @@ -289,7 +300,26 @@ func syncMessages( defer close(flushUpdateCh) for batch := range flushCh { for _, res := range batch { - flushers[res.addressID].push(res.update) + if err := res.update.err(); err != nil { + if err := vault.AddFailedMessageID(res.messageID); err != nil { + logrus.WithError(err).Error("Failed to add failed message ID") + } + + if err := sentry.ReportMessageWithContext("Failed to sync message", reporter.Context{ + "messageID": res.messageID, + "error": err, + }); err != nil { + logrus.WithError(err).Error("Failed to report message sync error") + } + + continue + } else { + if err := vault.RemFailedMessageID(res.messageID); err != nil { + logrus.WithError(err).Error("Failed to remove failed message ID") + } + + flushers[res.addressID].push(res.update.unwrap()) + } } for _, flusher := range flushers { @@ -321,7 +351,7 @@ func syncMessages( return fmt.Errorf("failed to set last synced message ID: %w", err) } - reporter.add(flushUpdate.batchLen) + syncReporter.add(flushUpdate.batchLen) } return <-errorCh diff --git a/internal/user/sync_build.go b/internal/user/sync_build.go index 897474a0..b5cfbd58 100644 --- a/internal/user/sync_build.go +++ b/internal/user/sync_build.go @@ -26,10 +26,39 @@ import ( "github.com/ProtonMail/proton-bridge/v3/pkg/message" ) +type result[T any] struct { + v T + e error +} + +func resOk[T any](v T) result[T] { + return result[T]{v: v} +} + +func resErr[T any](e error) result[T] { + return result[T]{e: e} +} + +func (r *result[T]) unwrap() T { + if r.e != nil { + panic(r.err) + } + + return r.v +} + +func (r *result[T]) unpack() (T, error) { + return r.v, r.e +} + +func (r *result[T]) err() error { + return r.e +} + type buildRes struct { messageID string addressID string - update *imap.MessageCreated + update result[*imap.MessageCreated] } func defaultJobOpts() message.JobOptions { @@ -43,22 +72,22 @@ func defaultJobOpts() message.JobOptions { } } -func buildRFC822(apiLabels map[string]proton.Label, full proton.FullMessage, addrKR *crypto.KeyRing) (*buildRes, error) { - literal, err := message.BuildRFC822(addrKR, full.Message, full.AttData, defaultJobOpts()) - if err != nil { - return nil, fmt.Errorf("failed to build message %s: %w", full.ID, err) - } +func buildRFC822(apiLabels map[string]proton.Label, full proton.FullMessage, addrKR *crypto.KeyRing) *buildRes { + var update result[*imap.MessageCreated] - update, err := newMessageCreatedUpdate(apiLabels, full.MessageMetadata, literal) - if err != nil { - return nil, fmt.Errorf("failed to create IMAP update for message %s: %w", full.ID, err) + if literal, err := message.BuildRFC822(addrKR, full.Message, full.AttData, defaultJobOpts()); err != nil { + update = resErr[*imap.MessageCreated](fmt.Errorf("failed to build RFC822 for message %s: %w", full.ID, err)) + } else if created, err := newMessageCreatedUpdate(apiLabels, full.MessageMetadata, literal); err != nil { + update = resErr[*imap.MessageCreated](fmt.Errorf("failed to create IMAP update for message %s: %w", full.ID, err)) + } else { + update = resOk(created) } return &buildRes{ messageID: full.ID, addressID: full.AddressID, update: update, - }, nil + } } func newMessageCreatedUpdate( diff --git a/internal/user/sync_reporter.go b/internal/user/sync_reporter.go index 3149e8cc..87722bb9 100644 --- a/internal/user/sync_reporter.go +++ b/internal/user/sync_reporter.go @@ -24,7 +24,7 @@ import ( "github.com/ProtonMail/proton-bridge/v3/internal/events" ) -type reporter struct { +type syncReporter struct { userID string eventCh *queue.QueuedChannel[events.Event] @@ -36,8 +36,8 @@ type reporter struct { freq time.Duration } -func newReporter(userID string, eventCh *queue.QueuedChannel[events.Event], total int, freq time.Duration) *reporter { - return &reporter{ +func newSyncReporter(userID string, eventCh *queue.QueuedChannel[events.Event], total int, freq time.Duration) *syncReporter { + return &syncReporter{ userID: userID, eventCh: eventCh, @@ -47,7 +47,7 @@ func newReporter(userID string, eventCh *queue.QueuedChannel[events.Event], tota } } -func (rep *reporter) add(delta int) { +func (rep *syncReporter) add(delta int) { rep.count += delta if time.Since(rep.last) > rep.freq { @@ -62,7 +62,7 @@ func (rep *reporter) add(delta int) { } } -func (rep *reporter) done() { +func (rep *syncReporter) done() { rep.eventCh.Enqueue(events.SyncProgress{ UserID: rep.userID, Progress: 1, diff --git a/internal/user/user.go b/internal/user/user.go index 28aebc0c..ccee8d65 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -29,7 +29,7 @@ import ( "github.com/ProtonMail/gluon/connector" "github.com/ProtonMail/gluon/imap" "github.com/ProtonMail/gluon/queue" - gluonReporter "github.com/ProtonMail/gluon/reporter" + "github.com/ProtonMail/gluon/reporter" "github.com/ProtonMail/go-proton-api" "github.com/ProtonMail/proton-bridge/v3/internal/async" "github.com/ProtonMail/proton-bridge/v3/internal/events" @@ -57,6 +57,7 @@ type User struct { vault *vault.User client *proton.Client + reporter reporter.Reporter eventCh *queue.QueuedChannel[events.Event] sendHash *sendRecorder @@ -72,8 +73,6 @@ type User struct { updateCh map[string]*queue.QueuedChannel[imap.Update] updateChLock safe.RWMutex - reporter gluonReporter.Reporter - tasks *async.Group abortable async.Abortable goSync func() @@ -92,9 +91,9 @@ func New( ctx context.Context, encVault *vault.User, client *proton.Client, + reporter reporter.Reporter, apiUser proton.User, crashHandler async.PanicHandler, - reporter gluonReporter.Reporter, syncWorkers int, showAllMail bool, ) (*User, error) { //nolint:funlen @@ -118,6 +117,7 @@ func New( vault: encVault, client: client, + reporter: reporter, eventCh: queue.NewQueuedChannel[events.Event](0, 0), sendHash: newSendRecorder(sendEntryExpiry), @@ -133,8 +133,6 @@ func New( updateCh: make(map[string]*queue.QueuedChannel[imap.Update]), updateChLock: safe.NewRWMutex(), - reporter: reporter, - tasks: async.NewGroup(context.Background(), crashHandler), pollAPIEventsCh: make(chan chan struct{}), diff --git a/internal/user/user_test.go b/internal/user/user_test.go index 524c5297..66bba5e0 100644 --- a/internal/user/user_test.go +++ b/internal/user/user_test.go @@ -218,7 +218,7 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma vaultUser, err := vault.AddUser(apiUser.ID, username, apiAuth.UID, apiAuth.RefreshToken, saltedKeyPass) require.NoError(tb, err) - user, err := New(ctx, vaultUser, client, apiUser, nil, nil, vault.SyncWorkers(), true) + user, err := New(ctx, vaultUser, client, nil, apiUser, nil, vault.SyncWorkers(), true) require.NoError(tb, err) defer user.Close() diff --git a/internal/vault/types_user.go b/internal/vault/types_user.go index 9673d554..b58c76c2 100644 --- a/internal/vault/types_user.go +++ b/internal/vault/types_user.go @@ -60,9 +60,10 @@ func (mode AddressMode) String() string { } type SyncStatus struct { - HasLabels bool - HasMessages bool - LastMessageID string + HasLabels bool + HasMessages bool + LastMessageID string + FailedMessageIDs []string } func (status SyncStatus) IsComplete() bool { diff --git a/internal/vault/user.go b/internal/vault/user.go index 38ee3ce9..50867f42 100644 --- a/internal/vault/user.go +++ b/internal/vault/user.go @@ -21,6 +21,8 @@ import ( "fmt" "github.com/ProtonMail/gluon/imap" + "github.com/bradenaw/juniper/xslices" + "golang.org/x/exp/slices" ) type User struct { @@ -158,6 +160,24 @@ func (user *User) SetLastMessageID(messageID string) error { }) } +// AddFailedMessageID adds a message ID to the list of failed message IDs. +func (user *User) AddFailedMessageID(messageID string) error { + return user.vault.modUser(user.userID, func(data *UserData) { + if !slices.Contains(data.SyncStatus.FailedMessageIDs, messageID) { + data.SyncStatus.FailedMessageIDs = append(data.SyncStatus.FailedMessageIDs, messageID) + } + }) +} + +// RemFailedMessageID removes a message ID from the list of failed message IDs. +func (user *User) RemFailedMessageID(messageID string) error { + return user.vault.modUser(user.userID, func(data *UserData) { + data.SyncStatus.FailedMessageIDs = xslices.Filter(data.SyncStatus.FailedMessageIDs, func(otherID string) bool { + return otherID != messageID + }) + }) +} + // ClearSyncStatus clears the user's sync status. func (user *User) ClearSyncStatus() error { return user.vault.modUser(user.userID, func(data *UserData) {