fix(GODT-2327): Only start processing events once sync is finished

This commit is contained in:
James Houlahan
2023-02-06 10:32:13 +01:00
committed by Jakub
parent 232875d5cc
commit 63bc87cc86
2 changed files with 52 additions and 46 deletions

View File

@ -187,9 +187,6 @@ func (bridge *Bridge) addIMAPUser(ctx context.Context, user *user.User) error {
// Trigger a sync for the user, if needed. // Trigger a sync for the user, if needed.
user.TriggerSync() user.TriggerSync()
// Start processing events for the user.
user.StartEvents()
return nil return nil
} }

View File

@ -187,12 +187,23 @@ func New(
user.log.Debug("Sync triggered") user.log.Debug("Sync triggered")
user.abortable.Do(ctx, func(ctx context.Context) { user.abortable.Do(ctx, func(ctx context.Context) {
if user.vault.SyncStatus().IsComplete() { if !user.vault.SyncStatus().IsComplete() {
user.log.Debug("Sync is already complete, skipping") if err := user.doSync(ctx); err != nil {
} else if err := user.doSync(ctx); err != nil {
user.log.WithError(err).Error("Failed to sync, will retry later") user.log.WithError(err).Error("Failed to sync, will retry later")
time.AfterFunc(SyncRetryCooldown, user.goSync)
go func() {
select {
case <-ctx.Done():
user.log.WithError(err).Warn("Aborting sync retry")
case <-time.After(SyncRetryCooldown):
user.goSync()
} }
}()
}
}
// Once we know the sync has completed, we can start polling for API events.
user.startEvents(ctx)
}) })
}) })
@ -203,44 +214,6 @@ func (user *User) TriggerSync() {
user.goSync() user.goSync()
} }
func (user *User) StartEvents() {
// Stream events from the API, logging any errors that occur.
// This does nothing until the sync has been marked as complete.
// When we receive an API event, we attempt to handle it.
// If successful, we update the event ID in the vault.
user.tasks.Once(func(ctx context.Context) {
ticker := proton.NewTicker(EventPeriod, EventJitter)
defer ticker.Stop()
for {
var doneCh chan struct{}
select {
case <-ctx.Done():
return
case doneCh = <-user.pollAPIEventsCh:
// ...
case <-ticker.C:
// ...
}
user.log.Debug("Event poll triggered")
if !user.vault.SyncStatus().IsComplete() {
user.log.Debug("Sync is incomplete, skipping event poll")
} else if err := user.doEventPoll(ctx); err != nil {
user.log.WithError(err).Error("Failed to poll events")
}
if doneCh != nil {
close(doneCh)
}
}
})
}
// ID returns the user's ID. // ID returns the user's ID.
func (user *User) ID() string { func (user *User) ID() string {
return safe.RLockRet(func() string { return safe.RLockRet(func() string {
@ -589,6 +562,42 @@ func (user *User) initUpdateCh(mode vault.AddressMode) {
} }
} }
// startEvents streams events from the API, logging any errors that occur.
// This does nothing until the sync has been marked as complete.
// When we receive an API event, we attempt to handle it.
// If successful, we update the event ID in the vault.
func (user *User) startEvents(ctx context.Context) {
ticker := proton.NewTicker(EventPeriod, EventJitter)
defer ticker.Stop()
for {
var doneCh chan struct{}
select {
case <-ctx.Done():
return
case doneCh = <-user.pollAPIEventsCh:
// ...
case <-ticker.C:
// ...
}
user.log.Debug("Event poll triggered")
if !user.vault.SyncStatus().IsComplete() {
user.log.Debug("Sync is incomplete, skipping event poll")
} else if err := user.doEventPoll(ctx); err != nil {
user.log.WithError(err).Error("Failed to poll events")
}
if doneCh != nil {
close(doneCh)
}
}
}
// doEventPoll is called whenever API events should be polled. // doEventPoll is called whenever API events should be polled.
func (user *User) doEventPoll(ctx context.Context) error { func (user *User) doEventPoll(ctx context.Context) error {
user.eventLock.Lock() user.eventLock.Lock()