forked from Silverfish/proton-bridge
Other: Only update event ID in vault once all gluon updates were applied
This commit is contained in:
@ -334,7 +334,10 @@ func (user *User) handleUpdateLabelEvent(_ context.Context, event liteapi.LabelE
|
|||||||
user.apiLabels[event.Label.ID] = event.Label
|
user.apiLabels[event.Label.ID] = event.Label
|
||||||
|
|
||||||
for _, updateCh := range user.updateCh {
|
for _, updateCh := range user.updateCh {
|
||||||
updateCh.Enqueue(imap.NewMailboxUpdated(imap.MailboxID(event.ID), getMailboxName(event.Label)))
|
updateCh.Enqueue(imap.NewMailboxUpdated(
|
||||||
|
imap.MailboxID(event.ID),
|
||||||
|
getMailboxName(event.Label),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
user.eventCh.Enqueue(events.UserLabelUpdated{
|
user.eventCh.Enqueue(events.UserLabelUpdated{
|
||||||
@ -453,14 +456,12 @@ func (user *User) handleUpdateMessageEvent(ctx context.Context, event liteapi.Me
|
|||||||
"subject": logging.Sensitive(event.Message.Subject),
|
"subject": logging.Sensitive(event.Message.Subject),
|
||||||
}).Info("Handling message updated event")
|
}).Info("Handling message updated event")
|
||||||
|
|
||||||
update := imap.NewMessageMailboxesUpdated(
|
user.updateCh[event.Message.AddressID].Enqueue(imap.NewMessageMailboxesUpdated(
|
||||||
imap.MessageID(event.ID),
|
imap.MessageID(event.ID),
|
||||||
mapTo[string, imap.MailboxID](xslices.Filter(event.Message.LabelIDs, wantLabelID)),
|
mapTo[string, imap.MailboxID](xslices.Filter(event.Message.LabelIDs, wantLabelID)),
|
||||||
event.Message.Seen(),
|
event.Message.Seen(),
|
||||||
event.Message.Starred(),
|
event.Message.Starred(),
|
||||||
)
|
))
|
||||||
|
|
||||||
user.updateCh[event.Message.AddressID].Enqueue(update)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}, user.updateChLock)
|
}, user.updateChLock)
|
||||||
@ -471,11 +472,7 @@ func (user *User) handleDeleteMessageEvent(ctx context.Context, event liteapi.Me
|
|||||||
user.log.WithField("messageID", event.ID).Info("Handling message deleted event")
|
user.log.WithField("messageID", event.ID).Info("Handling message deleted event")
|
||||||
|
|
||||||
for _, updateCh := range user.updateCh {
|
for _, updateCh := range user.updateCh {
|
||||||
update := imap.NewMessagesDeleted(
|
updateCh.Enqueue(imap.NewMessagesDeleted(imap.MessageID(event.ID)))
|
||||||
imap.MessageID(event.ID),
|
|
||||||
)
|
|
||||||
|
|
||||||
updateCh.Enqueue(update)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -500,9 +497,13 @@ func (user *User) handleUpdateDraftEvent(ctx context.Context, event liteapi.Mess
|
|||||||
return fmt.Errorf("failed to build RFC822 draft: %w", err)
|
return fmt.Errorf("failed to build RFC822 draft: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
update := imap.NewMessageUpdated(buildRes.update.Message, buildRes.update.Literal, buildRes.update.MailboxIDs, buildRes.update.ParsedMessage)
|
user.updateCh[full.AddressID].Enqueue(imap.NewMessageUpdated(
|
||||||
|
buildRes.update.Message,
|
||||||
|
buildRes.update.Literal,
|
||||||
|
buildRes.update.MailboxIDs,
|
||||||
|
buildRes.update.ParsedMessage,
|
||||||
|
))
|
||||||
|
|
||||||
user.updateCh[full.AddressID].Enqueue(update)
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}, user.apiUserLock, user.apiAddrsLock, user.updateChLock)
|
}, user.apiUserLock, user.apiAddrsLock, user.updateChLock)
|
||||||
|
|||||||
@ -516,12 +516,26 @@ func (user *User) doEventPoll(ctx context.Context) error {
|
|||||||
"new": event,
|
"new": event,
|
||||||
}).Info("Received new API event")
|
}).Info("Received new API event")
|
||||||
|
|
||||||
|
// Handle the event.
|
||||||
if err := user.handleAPIEvent(ctx, event); err != nil {
|
if err := user.handleAPIEvent(ctx, event); err != nil {
|
||||||
return fmt.Errorf("failed to handle event: %w", err)
|
return fmt.Errorf("failed to handle event: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
user.log.WithField("event", event).Debug("Handled API event")
|
user.log.WithField("event", event).Debug("Handled API event")
|
||||||
|
|
||||||
|
// Wait for all events to be applied.
|
||||||
|
safe.RLock(func() {
|
||||||
|
for _, updateCh := range user.updateCh {
|
||||||
|
update := imap.NewNoop()
|
||||||
|
defer update.WaitContext(ctx)
|
||||||
|
|
||||||
|
updateCh.Enqueue(update)
|
||||||
|
}
|
||||||
|
}, user.updateChLock)
|
||||||
|
|
||||||
|
user.log.WithField("event", event).Debug("All events applied to gluon")
|
||||||
|
|
||||||
|
// Update the event ID in the vault.
|
||||||
if err := user.vault.SetEventID(event.EventID); err != nil {
|
if err := user.vault.SetEventID(event.EventID); err != nil {
|
||||||
return fmt.Errorf("failed to update event ID: %w", err)
|
return fmt.Errorf("failed to update event ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user