GODT-2047: Clear last event ID when clearing sync status

This commit is contained in:
James Houlahan
2022-11-17 13:18:47 +00:00
parent fe5e8ce7f7
commit 1e48ab4b9c
5 changed files with 68 additions and 70 deletions

View File

@ -74,14 +74,13 @@ func (user *User) handleRefreshEvent(ctx context.Context, refresh liteapi.Refres
l.Info("Handling refresh event")
context := map[string]interface{}{
if err := user.reporter.ReportMessageWithContext("Warning: refresh occurred", map[string]interface{}{
"EventLoop": map[string]interface{}{
"EventID": eventID,
"Refresh": refresh,
},
}
if sentryErr := user.reporter.ReportMessageWithContext("Warning: refresh occurred", context); sentryErr != nil {
l.WithError(sentryErr).Error("Failed to report refresh to sentry")
}); err != nil {
l.WithError(err).Error("Failed to report refresh to sentry")
}
// Cancel and restart ongoing syncs.

View File

@ -43,9 +43,21 @@ const (
)
// doSync begins syncing the users data.
// It first ensures the latest event ID is known; if not, it fetches it.
// It sends a SyncStarted event and then either SyncFinished or SyncFailed
// depending on whether the sync was successful.
func (user *User) doSync(ctx context.Context) error {
if user.vault.EventID() == "" {
eventID, err := user.client.GetLatestEventID(ctx)
if err != nil {
return fmt.Errorf("failed to get latest event ID: %w", err)
}
if err := user.vault.SetEventID(eventID); err != nil {
return fmt.Errorf("failed to set latest event ID: %w", err)
}
}
start := time.Now()
user.log.WithField("start", start).Info("Beginning user sync")

View File

@ -110,36 +110,7 @@ func New(
return nil, fmt.Errorf("failed to get labels: %w", err)
}
// Get the latest event ID.
if encVault.EventID() == "" {
eventID, err := client.GetLatestEventID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get latest event ID: %w", err)
}
if err := encVault.SetEventID(eventID); err != nil {
return nil, fmt.Errorf("failed to set event ID: %w", err)
}
}
// Create update channels for each of the user's addresses.
// In combined mode, the addresses all share the same update channel.
updateCh := make(map[string]*queue.QueuedChannel[imap.Update])
switch encVault.AddressMode() {
case vault.CombinedMode:
primaryUpdateCh := queue.NewQueuedChannel[imap.Update](0, 0)
for _, addr := range apiAddrs {
updateCh[addr.ID] = primaryUpdateCh
}
case vault.SplitMode:
for _, addr := range apiAddrs {
updateCh[addr.ID] = queue.NewQueuedChannel[imap.Update](0, 0)
}
}
// Create the user object.
user := &User{
log: logrus.WithField("userID", apiUser.ID),
@ -157,7 +128,7 @@ func New(
apiLabels: groupBy(apiLabels, func(label liteapi.Label) string { return label.ID }),
apiLabelsLock: safe.NewRWMutex(),
updateCh: updateCh,
updateCh: make(map[string]*queue.QueuedChannel[imap.Update]),
updateChLock: safe.NewRWMutex(),
reporter: reporter,
@ -169,6 +140,9 @@ func New(
showAllMail: b32(showAllMail),
}
// Initialize the user's update channels for its current address mode.
user.initUpdateCh(encVault.AddressMode())
// When we receive an auth object, we update it in the vault.
// This will be used to authorize the user on the next run.
user.client.AddAuthHandler(func(auth liteapi.Auth) {
@ -196,35 +170,13 @@ func New(
// When we receive an API event, we attempt to handle it.
// If successful, we update the event ID in the vault.
user.goPoll = user.tasks.PeriodicOrTrigger(EventPeriod, EventJitter, func(ctx context.Context) {
user.log.Debug("Event poll triggered")
if !user.vault.SyncStatus().IsComplete() {
user.log.Debug("Sync is incomplete, skipping event stream")
return
user.log.Debug("Sync is incomplete, skipping event poll")
} else if err := user.doEventPoll(ctx); err != nil {
user.log.WithError(err).Error("Failed to poll events")
}
event, err := user.client.GetEvent(ctx, user.vault.EventID())
if err != nil {
user.log.WithError(err).Error("Failed to get event")
return
}
if event.EventID == user.vault.EventID() {
user.log.Debug("No new events")
return
}
user.log.WithField("event", event).Info("Received event")
if err := user.handleAPIEvent(ctx, event); err != nil {
user.log.WithError(err).Error("Failed to handle API event")
return
}
if err := user.vault.SetEventID(event.EventID); err != nil {
user.log.WithError(err).Error("Failed to update event ID in vault")
return
}
user.log.WithField("eventID", event.EventID).Debug("Updated event ID in vault")
})
// When triggered, attempt to sync the user.
@ -629,6 +581,37 @@ func (user *User) initUpdateCh(mode vault.AddressMode) {
}
}
// doEventPoll is called whenever API events should be polled.
func (user *User) doEventPoll(ctx context.Context) error {
event, err := user.client.GetEvent(ctx, user.vault.EventID())
if err != nil {
return fmt.Errorf("failed to get event: %w", err)
}
if event.EventID != user.vault.EventID() {
user.log.WithFields(logrus.Fields{
"old": user.vault.EventID(),
"new": event,
}).Info("Received new API event")
if err := user.handleAPIEvent(ctx, event); err != nil {
return fmt.Errorf("failed to handle event: %w", err)
}
user.log.WithField("event", event).Debug("Handled API event")
if err := user.vault.SetEventID(event.EventID); err != nil {
return fmt.Errorf("failed to update event ID: %w", err)
}
user.log.WithField("eventID", event.EventID).Debug("Updated event ID in vault")
} else {
user.log.Debug("No new API events")
}
return nil
}
// b32 returns a uint32 0 or 1 representing b.
func b32(b bool) uint32 {
if b {

View File

@ -128,13 +128,15 @@ func TestUser_Deauth(t *testing.T) {
withAPI(t, context.Background(), func(ctx context.Context, s *server.Server, m *liteapi.Manager) {
withAccount(t, s, "username", "password", []string{"email@pm.me"}, func(string, []string) {
withUser(t, ctx, s, m, "username", "password", func(user *User) {
eventCh := user.GetEventCh()
require.IsType(t, events.SyncStarted{}, <-user.GetEventCh())
require.IsType(t, events.SyncProgress{}, <-user.GetEventCh())
require.IsType(t, events.SyncFinished{}, <-user.GetEventCh())
// Revoke the user's auth token.
require.NoError(t, s.RevokeUser(user.ID()))
// The user should eventually be logged out.
require.Eventually(t, func() bool { _, ok := (<-eventCh).(events.UserDeauth); return ok }, 500*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { _, ok := (<-user.GetEventCh()).(events.UserDeauth); return ok }, 500*time.Second, 100*time.Millisecond)
})
})
})
@ -145,8 +147,12 @@ func TestUser_Refresh(t *testing.T) {
mockReporter := mocks.NewMockReporter(ctl)
withAPI(t, context.Background(), func(ctx context.Context, s *server.Server, m *liteapi.Manager) {
withAccount(t, s, "username", "password", []string{"email@pm.me"}, func(userID string, addrIDs []string) {
withAccount(t, s, "username", "password", []string{"email@pm.me"}, func(string, []string) {
withUser(t, ctx, s, m, "username", "password", func(user *User) {
require.IsType(t, events.SyncStarted{}, <-user.GetEventCh())
require.IsType(t, events.SyncProgress{}, <-user.GetEventCh())
require.IsType(t, events.SyncFinished{}, <-user.GetEventCh())
user.reporter = mockReporter
mockReporter.EXPECT().ReportMessageWithContext(
@ -154,14 +160,11 @@ func TestUser_Refresh(t *testing.T) {
mocks.NewRefreshContextMatcher(liteapi.RefreshAll),
).Return(nil)
// Get the event channel.
eventCh := user.GetEventCh()
// Send refresh event
require.NoError(t, s.RefreshUser(user.ID(), liteapi.RefreshAll))
// The user should eventually re-synced.
require.Eventually(t, func() bool { _, ok := (<-eventCh).(events.UserRefreshed); return ok }, 5*time.Second, 100*time.Millisecond)
// The user should eventually be re-synced.
require.Eventually(t, func() bool { _, ok := (<-user.GetEventCh()).(events.UserRefreshed); return ok }, 5*time.Second, 100*time.Millisecond)
})
})
})

View File

@ -159,6 +159,7 @@ func (user *User) SetLastMessageID(messageID string) error {
func (user *User) ClearSyncStatus() error {
return user.vault.modUser(user.userID, func(data *UserData) {
data.SyncStatus = SyncStatus{}
data.EventID = ""
})
}