From 1e48ab4b9c3349213d976ae8b085e84f4afd1ba7 Mon Sep 17 00:00:00 2001 From: James Houlahan Date: Thu, 17 Nov 2022 13:18:47 +0000 Subject: [PATCH] GODT-2047: Clear last event ID when clearing sync status --- internal/user/events.go | 7 ++- internal/user/sync.go | 12 +++++ internal/user/user.go | 99 ++++++++++++++++---------------------- internal/user/user_test.go | 19 +++++--- internal/vault/user.go | 1 + 5 files changed, 68 insertions(+), 70 deletions(-) diff --git a/internal/user/events.go b/internal/user/events.go index b09c581b..51145e23 100644 --- a/internal/user/events.go +++ b/internal/user/events.go @@ -74,14 +74,13 @@ func (user *User) handleRefreshEvent(ctx context.Context, refresh liteapi.Refres l.Info("Handling refresh event") - context := map[string]interface{}{ + if err := user.reporter.ReportMessageWithContext("Warning: refresh occurred", map[string]interface{}{ "EventLoop": map[string]interface{}{ "EventID": eventID, "Refresh": refresh, }, - } - if sentryErr := user.reporter.ReportMessageWithContext("Warning: refresh occurred", context); sentryErr != nil { - l.WithError(sentryErr).Error("Failed to report refresh to sentry") + }); err != nil { + l.WithError(err).Error("Failed to report refresh to sentry") } // Cancel and restart ongoing syncs. diff --git a/internal/user/sync.go b/internal/user/sync.go index f9810818..c1623724 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -43,9 +43,21 @@ const ( ) // doSync begins syncing the users data. +// It first ensures the latest event ID is known; if not, it fetches it. // It sends a SyncStarted event and then either SyncFinished or SyncFailed // depending on whether the sync was successful. func (user *User) doSync(ctx context.Context) error { + if user.vault.EventID() == "" { + eventID, err := user.client.GetLatestEventID(ctx) + if err != nil { + return fmt.Errorf("failed to get latest event ID: %w", err) + } + + if err := user.vault.SetEventID(eventID); err != nil { + return fmt.Errorf("failed to set latest event ID: %w", err) + } + } + start := time.Now() user.log.WithField("start", start).Info("Beginning user sync") diff --git a/internal/user/user.go b/internal/user/user.go index 32e78ca5..0f567ec4 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -110,36 +110,7 @@ func New( return nil, fmt.Errorf("failed to get labels: %w", err) } - // Get the latest event ID. - if encVault.EventID() == "" { - eventID, err := client.GetLatestEventID(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get latest event ID: %w", err) - } - - if err := encVault.SetEventID(eventID); err != nil { - return nil, fmt.Errorf("failed to set event ID: %w", err) - } - } - - // Create update channels for each of the user's addresses. - // In combined mode, the addresses all share the same update channel. - updateCh := make(map[string]*queue.QueuedChannel[imap.Update]) - - switch encVault.AddressMode() { - case vault.CombinedMode: - primaryUpdateCh := queue.NewQueuedChannel[imap.Update](0, 0) - - for _, addr := range apiAddrs { - updateCh[addr.ID] = primaryUpdateCh - } - - case vault.SplitMode: - for _, addr := range apiAddrs { - updateCh[addr.ID] = queue.NewQueuedChannel[imap.Update](0, 0) - } - } - + // Create the user object. user := &User{ log: logrus.WithField("userID", apiUser.ID), @@ -157,7 +128,7 @@ func New( apiLabels: groupBy(apiLabels, func(label liteapi.Label) string { return label.ID }), apiLabelsLock: safe.NewRWMutex(), - updateCh: updateCh, + updateCh: make(map[string]*queue.QueuedChannel[imap.Update]), updateChLock: safe.NewRWMutex(), reporter: reporter, @@ -169,6 +140,9 @@ func New( showAllMail: b32(showAllMail), } + // Initialize the user's update channels for its current address mode. + user.initUpdateCh(encVault.AddressMode()) + // When we receive an auth object, we update it in the vault. // This will be used to authorize the user on the next run. user.client.AddAuthHandler(func(auth liteapi.Auth) { @@ -196,35 +170,13 @@ func New( // When we receive an API event, we attempt to handle it. // If successful, we update the event ID in the vault. user.goPoll = user.tasks.PeriodicOrTrigger(EventPeriod, EventJitter, func(ctx context.Context) { + user.log.Debug("Event poll triggered") + if !user.vault.SyncStatus().IsComplete() { - user.log.Debug("Sync is incomplete, skipping event stream") - return + user.log.Debug("Sync is incomplete, skipping event poll") + } else if err := user.doEventPoll(ctx); err != nil { + user.log.WithError(err).Error("Failed to poll events") } - - event, err := user.client.GetEvent(ctx, user.vault.EventID()) - if err != nil { - user.log.WithError(err).Error("Failed to get event") - return - } - - if event.EventID == user.vault.EventID() { - user.log.Debug("No new events") - return - } - - user.log.WithField("event", event).Info("Received event") - - if err := user.handleAPIEvent(ctx, event); err != nil { - user.log.WithError(err).Error("Failed to handle API event") - return - } - - if err := user.vault.SetEventID(event.EventID); err != nil { - user.log.WithError(err).Error("Failed to update event ID in vault") - return - } - - user.log.WithField("eventID", event.EventID).Debug("Updated event ID in vault") }) // When triggered, attempt to sync the user. @@ -629,6 +581,37 @@ func (user *User) initUpdateCh(mode vault.AddressMode) { } } +// doEventPoll is called whenever API events should be polled. +func (user *User) doEventPoll(ctx context.Context) error { + event, err := user.client.GetEvent(ctx, user.vault.EventID()) + if err != nil { + return fmt.Errorf("failed to get event: %w", err) + } + + if event.EventID != user.vault.EventID() { + user.log.WithFields(logrus.Fields{ + "old": user.vault.EventID(), + "new": event, + }).Info("Received new API event") + + if err := user.handleAPIEvent(ctx, event); err != nil { + return fmt.Errorf("failed to handle event: %w", err) + } + + user.log.WithField("event", event).Debug("Handled API event") + + if err := user.vault.SetEventID(event.EventID); err != nil { + return fmt.Errorf("failed to update event ID: %w", err) + } + + user.log.WithField("eventID", event.EventID).Debug("Updated event ID in vault") + } else { + user.log.Debug("No new API events") + } + + return nil +} + // b32 returns a uint32 0 or 1 representing b. func b32(b bool) uint32 { if b { diff --git a/internal/user/user_test.go b/internal/user/user_test.go index df769268..6337d19d 100644 --- a/internal/user/user_test.go +++ b/internal/user/user_test.go @@ -128,13 +128,15 @@ func TestUser_Deauth(t *testing.T) { withAPI(t, context.Background(), func(ctx context.Context, s *server.Server, m *liteapi.Manager) { withAccount(t, s, "username", "password", []string{"email@pm.me"}, func(string, []string) { withUser(t, ctx, s, m, "username", "password", func(user *User) { - eventCh := user.GetEventCh() + require.IsType(t, events.SyncStarted{}, <-user.GetEventCh()) + require.IsType(t, events.SyncProgress{}, <-user.GetEventCh()) + require.IsType(t, events.SyncFinished{}, <-user.GetEventCh()) // Revoke the user's auth token. require.NoError(t, s.RevokeUser(user.ID())) // The user should eventually be logged out. - require.Eventually(t, func() bool { _, ok := (<-eventCh).(events.UserDeauth); return ok }, 500*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { _, ok := (<-user.GetEventCh()).(events.UserDeauth); return ok }, 500*time.Second, 100*time.Millisecond) }) }) }) @@ -145,8 +147,12 @@ func TestUser_Refresh(t *testing.T) { mockReporter := mocks.NewMockReporter(ctl) withAPI(t, context.Background(), func(ctx context.Context, s *server.Server, m *liteapi.Manager) { - withAccount(t, s, "username", "password", []string{"email@pm.me"}, func(userID string, addrIDs []string) { + withAccount(t, s, "username", "password", []string{"email@pm.me"}, func(string, []string) { withUser(t, ctx, s, m, "username", "password", func(user *User) { + require.IsType(t, events.SyncStarted{}, <-user.GetEventCh()) + require.IsType(t, events.SyncProgress{}, <-user.GetEventCh()) + require.IsType(t, events.SyncFinished{}, <-user.GetEventCh()) + user.reporter = mockReporter mockReporter.EXPECT().ReportMessageWithContext( @@ -154,14 +160,11 @@ func TestUser_Refresh(t *testing.T) { mocks.NewRefreshContextMatcher(liteapi.RefreshAll), ).Return(nil) - // Get the event channel. - eventCh := user.GetEventCh() - // Send refresh event require.NoError(t, s.RefreshUser(user.ID(), liteapi.RefreshAll)) - // The user should eventually re-synced. - require.Eventually(t, func() bool { _, ok := (<-eventCh).(events.UserRefreshed); return ok }, 5*time.Second, 100*time.Millisecond) + // The user should eventually be re-synced. + require.Eventually(t, func() bool { _, ok := (<-user.GetEventCh()).(events.UserRefreshed); return ok }, 5*time.Second, 100*time.Millisecond) }) }) }) diff --git a/internal/vault/user.go b/internal/vault/user.go index 6151e70d..28b1d424 100644 --- a/internal/vault/user.go +++ b/internal/vault/user.go @@ -159,6 +159,7 @@ func (user *User) SetLastMessageID(messageID string) error { func (user *User) ClearSyncStatus() error { return user.vault.modUser(user.userID, func(data *UserData) { data.SyncStatus = SyncStatus{} + data.EventID = "" }) }