GODT-2187: Skip messages during sync that fail to build/parse

This commit is contained in:
James Houlahan
2022-12-06 13:04:24 +01:00
parent 01c12655b8
commit 58d04f9693
9 changed files with 133 additions and 55 deletions

View File

@ -441,9 +441,9 @@ func (bridge *Bridge) addUserWithVault(
ctx, ctx,
vault, vault,
client, client,
bridge.reporter,
apiUser, apiUser,
bridge.crashHandler, bridge.crashHandler,
bridge.reporter,
bridge.vault.SyncWorkers(), bridge.vault.SyncWorkers(),
bridge.vault.GetShowAllMail(), bridge.vault.GetShowAllMail(),
) )

View File

@ -438,12 +438,12 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, event proton.Mes
}).Info("Handling message created event") }).Info("Handling message created event")
return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error { return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error {
buildRes, err := buildRFC822(user.apiLabels, full, addrKR) update, err := buildRFC822(user.apiLabels, full, addrKR).update.unpack()
if err != nil { if err != nil {
return fmt.Errorf("failed to build RFC822 message: %w", err) return fmt.Errorf("failed to build RFC822 message: %w", err)
} }
user.updateCh[full.AddressID].Enqueue(imap.NewMessagesCreated(buildRes.update)) user.updateCh[full.AddressID].Enqueue(imap.NewMessagesCreated(update))
return nil return nil
}) })
@ -493,16 +493,16 @@ func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.Messa
} }
return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error { return withAddrKR(user.apiUser, user.apiAddrs[event.Message.AddressID], user.vault.KeyPass(), func(_, addrKR *crypto.KeyRing) error {
buildRes, err := buildRFC822(user.apiLabels, full, addrKR) update, err := buildRFC822(user.apiLabels, full, addrKR).update.unpack()
if err != nil { if err != nil {
return fmt.Errorf("failed to build RFC822 draft: %w", err) return fmt.Errorf("failed to build RFC822 draft: %w", err)
} }
user.updateCh[full.AddressID].Enqueue(imap.NewMessageUpdated( user.updateCh[full.AddressID].Enqueue(imap.NewMessageUpdated(
buildRes.update.Message, update.Message,
buildRes.update.Literal, update.Literal,
buildRes.update.MailboxIDs, update.MailboxIDs,
buildRes.update.ParsedMessage, update.ParsedMessage,
)) ))
return nil return nil

View File

@ -26,6 +26,7 @@ import (
"github.com/ProtonMail/gluon/imap" "github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/queue" "github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/reporter"
"github.com/ProtonMail/go-proton-api" "github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/gopenpgp/v2/crypto" "github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/ProtonMail/proton-bridge/v3/internal/events" "github.com/ProtonMail/proton-bridge/v3/internal/events"
@ -36,6 +37,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"golang.org/x/exp/slices"
) )
const ( const (
@ -87,6 +89,7 @@ func (user *User) doSync(ctx context.Context) error {
return nil return nil
} }
// nolint:funlen
func (user *User) sync(ctx context.Context) error { func (user *User) sync(ctx context.Context) error {
return safe.RLockRet(func() error { return safe.RLockRet(func() error {
return withAddrKRs(user.apiUser, user.apiAddrs, user.vault.KeyPass(), func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error { return withAddrKRs(user.apiUser, user.apiAddrs, user.vault.KeyPass(), func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error {
@ -109,10 +112,32 @@ func (user *User) sync(ctx context.Context) error {
if !user.vault.SyncStatus().HasMessages { if !user.vault.SyncStatus().HasMessages {
user.log.Info("Syncing messages") user.log.Info("Syncing messages")
// Determine which messages to sync.
messageIDs, err := user.client.GetMessageIDs(ctx, "")
if err != nil {
return fmt.Errorf("failed to get message IDs to sync: %w", err)
}
// Remove any messages that have already failed to sync.
messageIDs = xslices.Filter(messageIDs, func(messageID string) bool {
return !slices.Contains(user.vault.SyncStatus().FailedMessageIDs, messageID)
})
// Reverse the order of the message IDs so that the newest messages are synced first.
xslices.Reverse(messageIDs)
// If we have a message ID that we've already synced, then we can skip all messages before it.
if idx := xslices.Index(messageIDs, user.vault.SyncStatus().LastMessageID); idx >= 0 {
messageIDs = messageIDs[idx+1:]
}
// Sync the messages.
if err := syncMessages( if err := syncMessages(
ctx, ctx,
user.ID(), user.ID(),
messageIDs,
user.client, user.client,
user.reporter,
user.vault, user.vault,
user.apiLabels, user.apiLabels,
addrKRs, addrKRs,
@ -183,7 +208,9 @@ func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh
func syncMessages( func syncMessages(
ctx context.Context, ctx context.Context,
userID string, userID string,
messageIDs []string,
client *proton.Client, client *proton.Client,
sentry reporter.Reporter,
vault *vault.User, vault *vault.User,
apiLabels map[string]proton.Label, apiLabels map[string]proton.Label,
addrKRs map[string]*crypto.KeyRing, addrKRs map[string]*crypto.KeyRing,
@ -194,20 +221,6 @@ func syncMessages(
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
// Determine which messages to sync.
messageIDs, err := client.GetMessageIDs(ctx, "")
if err != nil {
return fmt.Errorf("failed to get message IDs to sync: %w", err)
}
// Reverse the order of the message IDs so that the newest messages are synced first.
xslices.Reverse(messageIDs)
// If we have a message ID that we've already synced, then we can skip all messages before it.
if idx := xslices.Index(messageIDs, vault.SyncStatus().LastMessageID); idx >= 0 {
messageIDs = messageIDs[idx+1:]
}
// Track the amount of time to process all the messages. // Track the amount of time to process all the messages.
syncStartTime := time.Now() syncStartTime := time.Now()
defer func() { logrus.WithField("duration", time.Since(syncStartTime)).Info("Message sync completed") }() defer func() { logrus.WithField("duration", time.Since(syncStartTime)).Info("Message sync completed") }()
@ -222,14 +235,12 @@ func syncMessages(
flushers := make(map[string]*flusher, len(updateCh)) flushers := make(map[string]*flusher, len(updateCh))
for addrID, updateCh := range updateCh { for addrID, updateCh := range updateCh {
flusher := newFlusher(updateCh, maxUpdateSize) flushers[addrID] = newFlusher(updateCh, maxUpdateSize)
flushers[addrID] = flusher
} }
// Create a reporter to report sync progress updates. // Create a reporter to report sync progress updates.
reporter := newReporter(userID, eventCh, len(messageIDs), time.Second) syncReporter := newSyncReporter(userID, eventCh, len(messageIDs), time.Second)
defer reporter.done() defer syncReporter.done()
type flushUpdate struct { type flushUpdate struct {
messageID string messageID string
@ -267,7 +278,7 @@ func syncMessages(
return nil, ctx.Err() return nil, ctx.Err()
} }
return buildRFC822(apiLabels, msg, addrKRs[msg.AddressID]) return buildRFC822(apiLabels, msg, addrKRs[msg.AddressID]), nil
}) })
if err != nil { if err != nil {
errorCh <- err errorCh <- err
@ -289,7 +300,26 @@ func syncMessages(
defer close(flushUpdateCh) defer close(flushUpdateCh)
for batch := range flushCh { for batch := range flushCh {
for _, res := range batch { for _, res := range batch {
flushers[res.addressID].push(res.update) if err := res.update.err(); err != nil {
if err := vault.AddFailedMessageID(res.messageID); err != nil {
logrus.WithError(err).Error("Failed to add failed message ID")
}
if err := sentry.ReportMessageWithContext("Failed to sync message", reporter.Context{
"messageID": res.messageID,
"error": err,
}); err != nil {
logrus.WithError(err).Error("Failed to report message sync error")
}
continue
} else {
if err := vault.RemFailedMessageID(res.messageID); err != nil {
logrus.WithError(err).Error("Failed to remove failed message ID")
}
flushers[res.addressID].push(res.update.unwrap())
}
} }
for _, flusher := range flushers { for _, flusher := range flushers {
@ -321,7 +351,7 @@ func syncMessages(
return fmt.Errorf("failed to set last synced message ID: %w", err) return fmt.Errorf("failed to set last synced message ID: %w", err)
} }
reporter.add(flushUpdate.batchLen) syncReporter.add(flushUpdate.batchLen)
} }
return <-errorCh return <-errorCh

View File

@ -26,10 +26,39 @@ import (
"github.com/ProtonMail/proton-bridge/v3/pkg/message" "github.com/ProtonMail/proton-bridge/v3/pkg/message"
) )
type result[T any] struct {
v T
e error
}
func resOk[T any](v T) result[T] {
return result[T]{v: v}
}
func resErr[T any](e error) result[T] {
return result[T]{e: e}
}
func (r *result[T]) unwrap() T {
if r.e != nil {
panic(r.err)
}
return r.v
}
func (r *result[T]) unpack() (T, error) {
return r.v, r.e
}
func (r *result[T]) err() error {
return r.e
}
type buildRes struct { type buildRes struct {
messageID string messageID string
addressID string addressID string
update *imap.MessageCreated update result[*imap.MessageCreated]
} }
func defaultJobOpts() message.JobOptions { func defaultJobOpts() message.JobOptions {
@ -43,22 +72,22 @@ func defaultJobOpts() message.JobOptions {
} }
} }
func buildRFC822(apiLabels map[string]proton.Label, full proton.FullMessage, addrKR *crypto.KeyRing) (*buildRes, error) { func buildRFC822(apiLabels map[string]proton.Label, full proton.FullMessage, addrKR *crypto.KeyRing) *buildRes {
literal, err := message.BuildRFC822(addrKR, full.Message, full.AttData, defaultJobOpts()) var update result[*imap.MessageCreated]
if err != nil {
return nil, fmt.Errorf("failed to build message %s: %w", full.ID, err)
}
update, err := newMessageCreatedUpdate(apiLabels, full.MessageMetadata, literal) if literal, err := message.BuildRFC822(addrKR, full.Message, full.AttData, defaultJobOpts()); err != nil {
if err != nil { update = resErr[*imap.MessageCreated](fmt.Errorf("failed to build RFC822 for message %s: %w", full.ID, err))
return nil, fmt.Errorf("failed to create IMAP update for message %s: %w", full.ID, err) } else if created, err := newMessageCreatedUpdate(apiLabels, full.MessageMetadata, literal); err != nil {
update = resErr[*imap.MessageCreated](fmt.Errorf("failed to create IMAP update for message %s: %w", full.ID, err))
} else {
update = resOk(created)
} }
return &buildRes{ return &buildRes{
messageID: full.ID, messageID: full.ID,
addressID: full.AddressID, addressID: full.AddressID,
update: update, update: update,
}, nil }
} }
func newMessageCreatedUpdate( func newMessageCreatedUpdate(

View File

@ -24,7 +24,7 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/events" "github.com/ProtonMail/proton-bridge/v3/internal/events"
) )
type reporter struct { type syncReporter struct {
userID string userID string
eventCh *queue.QueuedChannel[events.Event] eventCh *queue.QueuedChannel[events.Event]
@ -36,8 +36,8 @@ type reporter struct {
freq time.Duration freq time.Duration
} }
func newReporter(userID string, eventCh *queue.QueuedChannel[events.Event], total int, freq time.Duration) *reporter { func newSyncReporter(userID string, eventCh *queue.QueuedChannel[events.Event], total int, freq time.Duration) *syncReporter {
return &reporter{ return &syncReporter{
userID: userID, userID: userID,
eventCh: eventCh, eventCh: eventCh,
@ -47,7 +47,7 @@ func newReporter(userID string, eventCh *queue.QueuedChannel[events.Event], tota
} }
} }
func (rep *reporter) add(delta int) { func (rep *syncReporter) add(delta int) {
rep.count += delta rep.count += delta
if time.Since(rep.last) > rep.freq { if time.Since(rep.last) > rep.freq {
@ -62,7 +62,7 @@ func (rep *reporter) add(delta int) {
} }
} }
func (rep *reporter) done() { func (rep *syncReporter) done() {
rep.eventCh.Enqueue(events.SyncProgress{ rep.eventCh.Enqueue(events.SyncProgress{
UserID: rep.userID, UserID: rep.userID,
Progress: 1, Progress: 1,

View File

@ -29,7 +29,7 @@ import (
"github.com/ProtonMail/gluon/connector" "github.com/ProtonMail/gluon/connector"
"github.com/ProtonMail/gluon/imap" "github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/queue" "github.com/ProtonMail/gluon/queue"
gluonReporter "github.com/ProtonMail/gluon/reporter" "github.com/ProtonMail/gluon/reporter"
"github.com/ProtonMail/go-proton-api" "github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/proton-bridge/v3/internal/async" "github.com/ProtonMail/proton-bridge/v3/internal/async"
"github.com/ProtonMail/proton-bridge/v3/internal/events" "github.com/ProtonMail/proton-bridge/v3/internal/events"
@ -57,6 +57,7 @@ type User struct {
vault *vault.User vault *vault.User
client *proton.Client client *proton.Client
reporter reporter.Reporter
eventCh *queue.QueuedChannel[events.Event] eventCh *queue.QueuedChannel[events.Event]
sendHash *sendRecorder sendHash *sendRecorder
@ -72,8 +73,6 @@ type User struct {
updateCh map[string]*queue.QueuedChannel[imap.Update] updateCh map[string]*queue.QueuedChannel[imap.Update]
updateChLock safe.RWMutex updateChLock safe.RWMutex
reporter gluonReporter.Reporter
tasks *async.Group tasks *async.Group
abortable async.Abortable abortable async.Abortable
goSync func() goSync func()
@ -92,9 +91,9 @@ func New(
ctx context.Context, ctx context.Context,
encVault *vault.User, encVault *vault.User,
client *proton.Client, client *proton.Client,
reporter reporter.Reporter,
apiUser proton.User, apiUser proton.User,
crashHandler async.PanicHandler, crashHandler async.PanicHandler,
reporter gluonReporter.Reporter,
syncWorkers int, syncWorkers int,
showAllMail bool, showAllMail bool,
) (*User, error) { //nolint:funlen ) (*User, error) { //nolint:funlen
@ -118,6 +117,7 @@ func New(
vault: encVault, vault: encVault,
client: client, client: client,
reporter: reporter,
eventCh: queue.NewQueuedChannel[events.Event](0, 0), eventCh: queue.NewQueuedChannel[events.Event](0, 0),
sendHash: newSendRecorder(sendEntryExpiry), sendHash: newSendRecorder(sendEntryExpiry),
@ -133,8 +133,6 @@ func New(
updateCh: make(map[string]*queue.QueuedChannel[imap.Update]), updateCh: make(map[string]*queue.QueuedChannel[imap.Update]),
updateChLock: safe.NewRWMutex(), updateChLock: safe.NewRWMutex(),
reporter: reporter,
tasks: async.NewGroup(context.Background(), crashHandler), tasks: async.NewGroup(context.Background(), crashHandler),
pollAPIEventsCh: make(chan chan struct{}), pollAPIEventsCh: make(chan chan struct{}),

View File

@ -218,7 +218,7 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma
vaultUser, err := vault.AddUser(apiUser.ID, username, apiAuth.UID, apiAuth.RefreshToken, saltedKeyPass) vaultUser, err := vault.AddUser(apiUser.ID, username, apiAuth.UID, apiAuth.RefreshToken, saltedKeyPass)
require.NoError(tb, err) require.NoError(tb, err)
user, err := New(ctx, vaultUser, client, apiUser, nil, nil, vault.SyncWorkers(), true) user, err := New(ctx, vaultUser, client, nil, apiUser, nil, vault.SyncWorkers(), true)
require.NoError(tb, err) require.NoError(tb, err)
defer user.Close() defer user.Close()

View File

@ -60,9 +60,10 @@ func (mode AddressMode) String() string {
} }
type SyncStatus struct { type SyncStatus struct {
HasLabels bool HasLabels bool
HasMessages bool HasMessages bool
LastMessageID string LastMessageID string
FailedMessageIDs []string
} }
func (status SyncStatus) IsComplete() bool { func (status SyncStatus) IsComplete() bool {

View File

@ -21,6 +21,8 @@ import (
"fmt" "fmt"
"github.com/ProtonMail/gluon/imap" "github.com/ProtonMail/gluon/imap"
"github.com/bradenaw/juniper/xslices"
"golang.org/x/exp/slices"
) )
type User struct { type User struct {
@ -158,6 +160,24 @@ func (user *User) SetLastMessageID(messageID string) error {
}) })
} }
// AddFailedMessageID adds a message ID to the list of failed message IDs.
func (user *User) AddFailedMessageID(messageID string) error {
return user.vault.modUser(user.userID, func(data *UserData) {
if !slices.Contains(data.SyncStatus.FailedMessageIDs, messageID) {
data.SyncStatus.FailedMessageIDs = append(data.SyncStatus.FailedMessageIDs, messageID)
}
})
}
// RemFailedMessageID removes a message ID from the list of failed message IDs.
func (user *User) RemFailedMessageID(messageID string) error {
return user.vault.modUser(user.userID, func(data *UserData) {
data.SyncStatus.FailedMessageIDs = xslices.Filter(data.SyncStatus.FailedMessageIDs, func(otherID string) bool {
return otherID != messageID
})
})
}
// ClearSyncStatus clears the user's sync status. // ClearSyncStatus clears the user's sync status.
func (user *User) ClearSyncStatus() error { func (user *User) ClearSyncStatus() error {
return user.vault.modUser(user.userID, func(data *UserData) { return user.vault.modUser(user.userID, func(data *UserData) {