From 232875d5cccd685f86cf0b44447bed18c16b2846 Mon Sep 17 00:00:00 2001 From: James Houlahan Date: Fri, 3 Feb 2023 15:06:27 +0100 Subject: [PATCH] fix(GODT-2327): Delay event processing until gluon user exists We don't want to start processing events until those events have somewhere to be sent to. Also, to be safe, ensure remove and re-add the gluon user while clearing its sync status. This shouldn't be necessary. --- internal/bridge/imap.go | 18 ++++++++++ internal/user/user.go | 77 ++++++++++++++++++++++++----------------- 2 files changed, 63 insertions(+), 32 deletions(-) diff --git a/internal/bridge/imap.go b/internal/bridge/imap.go index 16cdfebb..ba8d5421 100644 --- a/internal/bridge/imap.go +++ b/internal/bridge/imap.go @@ -131,9 +131,22 @@ func (bridge *Bridge) addIMAPUser(ctx context.Context, user *user.User) error { // If the DB was newly created, clear the sync status; gluon's DB was not found. logrus.Warn("IMAP user DB was newly created, clearing sync status") + // Remove the user from IMAP so we can clear the sync status. + if err := bridge.imapServer.RemoveUser(ctx, gluonID, false); err != nil { + return fmt.Errorf("failed to remove IMAP user: %w", err) + } + + // Clear the sync status -- we need to resync all messages. if err := user.ClearSyncStatus(); err != nil { return fmt.Errorf("failed to clear sync status: %w", err) } + + // Add the user back to the IMAP server. + if isNew, err := bridge.imapServer.LoadUser(ctx, imapConn, gluonID, user.GluonKey()); err != nil { + return fmt.Errorf("failed to add IMAP user: %w", err) + } else if isNew { + panic("IMAP user should already have a database") + } } else if status := user.GetSyncStatus(); !status.HasLabels { // Otherwise, the DB already exists -- if the labels are not yet synced, we need to re-create the DB. if err := bridge.imapServer.RemoveUser(ctx, gluonID, true); err != nil { @@ -171,7 +184,12 @@ func (bridge *Bridge) addIMAPUser(ctx context.Context, user *user.User) error { } } + // Trigger a sync for the user, if needed. user.TriggerSync() + + // Start processing events for the user. + user.StartEvents() + return nil } diff --git a/internal/user/user.go b/internal/user/user.go index 7fd12a0b..213835f7 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -171,6 +171,39 @@ func New( return nil }) + // When triggered, poll the API for events, optionally blocking until the poll is complete. + user.goPollAPIEvents = func(wait bool) { + doneCh := make(chan struct{}) + + go func() { user.pollAPIEventsCh <- doneCh }() + + if wait { + <-doneCh + } + } + + // When triggered, attempt to sync the user. + user.goSync = user.tasks.Trigger(func(ctx context.Context) { + user.log.Debug("Sync triggered") + + user.abortable.Do(ctx, func(ctx context.Context) { + if user.vault.SyncStatus().IsComplete() { + user.log.Debug("Sync is already complete, skipping") + } else if err := user.doSync(ctx); err != nil { + user.log.WithError(err).Error("Failed to sync, will retry later") + time.AfterFunc(SyncRetryCooldown, user.goSync) + } + }) + }) + + return user, nil +} + +func (user *User) TriggerSync() { + user.goSync() +} + +func (user *User) StartEvents() { // Stream events from the API, logging any errors that occur. // This does nothing until the sync has been marked as complete. // When we receive an API event, we attempt to handle it. @@ -206,37 +239,6 @@ func New( } } }) - - // When triggered, poll the API for events, optionally blocking until the poll is complete. - user.goPollAPIEvents = func(wait bool) { - doneCh := make(chan struct{}) - - go func() { user.pollAPIEventsCh <- doneCh }() - - if wait { - <-doneCh - } - } - - // When triggered, attempt to sync the user. - user.goSync = user.tasks.Trigger(func(ctx context.Context) { - user.log.Debug("Sync triggered") - - user.abortable.Do(ctx, func(ctx context.Context) { - if user.vault.SyncStatus().IsComplete() { - user.log.Debug("Sync is already complete, skipping") - } else if err := user.doSync(ctx); err != nil { - user.log.WithError(err).Error("Failed to sync, will retry later") - time.AfterFunc(SyncRetryCooldown, user.goSync) - } - }) - }) - - return user, nil -} - -func (user *User) TriggerSync() { - user.goSync() } // ID returns the user's ID. @@ -497,8 +499,19 @@ func (user *User) GetSyncStatus() vault.SyncStatus { } // ClearSyncStatus clears the sync status of the user. +// This also drops any updates in the update channel(s). func (user *User) ClearSyncStatus() error { - return user.vault.ClearSyncStatus() + user.log.Info("Clearing sync status") + + return safe.LockRet(func() error { + user.initUpdateCh(user.vault.AddressMode()) + + if err := user.vault.ClearSyncStatus(); err != nil { + return fmt.Errorf("failed to clear sync status: %w", err) + } + + return nil + }, user.eventLock, user.apiAddrsLock, user.updateChLock) } // Logout logs the user out from the API.