diff --git a/internal/user/events.go b/internal/user/events.go index 51145e23..a0b0b474 100644 --- a/internal/user/events.go +++ b/internal/user/events.go @@ -334,7 +334,10 @@ func (user *User) handleUpdateLabelEvent(_ context.Context, event liteapi.LabelE user.apiLabels[event.Label.ID] = event.Label for _, updateCh := range user.updateCh { - updateCh.Enqueue(imap.NewMailboxUpdated(imap.MailboxID(event.ID), getMailboxName(event.Label))) + updateCh.Enqueue(imap.NewMailboxUpdated( + imap.MailboxID(event.ID), + getMailboxName(event.Label), + )) } user.eventCh.Enqueue(events.UserLabelUpdated{ @@ -453,14 +456,12 @@ func (user *User) handleUpdateMessageEvent(ctx context.Context, event liteapi.Me "subject": logging.Sensitive(event.Message.Subject), }).Info("Handling message updated event") - update := imap.NewMessageMailboxesUpdated( + user.updateCh[event.Message.AddressID].Enqueue(imap.NewMessageMailboxesUpdated( imap.MessageID(event.ID), mapTo[string, imap.MailboxID](xslices.Filter(event.Message.LabelIDs, wantLabelID)), event.Message.Seen(), event.Message.Starred(), - ) - - user.updateCh[event.Message.AddressID].Enqueue(update) + )) return nil }, user.updateChLock) @@ -471,11 +472,7 @@ func (user *User) handleDeleteMessageEvent(ctx context.Context, event liteapi.Me user.log.WithField("messageID", event.ID).Info("Handling message deleted event") for _, updateCh := range user.updateCh { - update := imap.NewMessagesDeleted( - imap.MessageID(event.ID), - ) - - updateCh.Enqueue(update) + updateCh.Enqueue(imap.NewMessagesDeleted(imap.MessageID(event.ID))) } return nil @@ -500,9 +497,13 @@ func (user *User) handleUpdateDraftEvent(ctx context.Context, event liteapi.Mess return fmt.Errorf("failed to build RFC822 draft: %w", err) } - update := imap.NewMessageUpdated(buildRes.update.Message, buildRes.update.Literal, buildRes.update.MailboxIDs, buildRes.update.ParsedMessage) + user.updateCh[full.AddressID].Enqueue(imap.NewMessageUpdated( + buildRes.update.Message, + buildRes.update.Literal, + buildRes.update.MailboxIDs, + buildRes.update.ParsedMessage, + )) - user.updateCh[full.AddressID].Enqueue(update) return nil }) }, user.apiUserLock, user.apiAddrsLock, user.updateChLock) diff --git a/internal/user/user.go b/internal/user/user.go index 91301d6a..56643438 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -516,12 +516,26 @@ func (user *User) doEventPoll(ctx context.Context) error { "new": event, }).Info("Received new API event") + // Handle the event. if err := user.handleAPIEvent(ctx, event); err != nil { return fmt.Errorf("failed to handle event: %w", err) } user.log.WithField("event", event).Debug("Handled API event") + // Wait for all events to be applied. + safe.RLock(func() { + for _, updateCh := range user.updateCh { + update := imap.NewNoop() + defer update.WaitContext(ctx) + + updateCh.Enqueue(update) + } + }, user.updateChLock) + + user.log.WithField("event", event).Debug("All events applied to gluon") + + // Update the event ID in the vault. if err := user.vault.SetEventID(event.EventID); err != nil { return fmt.Errorf("failed to update event ID: %w", err) }