fix(GODT-2803): Separate conditions to pause event loop for IMAP

Add two separate toggles to control event loop pausing. This is required
to prevent cases where the bridge requests the event loop to be paused
but a sync process completes and resumes the event loop.

For the loop to resume now both states need to be set to false. This
will be removed once GODT-2848 is implemented.
This commit is contained in:
Leander Beernaert
2023-08-11 11:23:28 +02:00
parent a5500629e5
commit 24331f9715
5 changed files with 34 additions and 4 deletions

View File

@ -38,7 +38,8 @@ import (
type EventProvider interface {
userevents.Subscribable
userevents.EventController
PauseIMAP()
ResumeIMAP()
}
type Telemetry interface {
@ -362,7 +363,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
}
s.log.Info("Sync complete, starting API event stream")
s.eventProvider.Resume()
s.eventProvider.ResumeIMAP()
}
case update, ok := <-s.syncHandler.updater.ch:

View File

@ -93,7 +93,7 @@ func (s *syncHandler) OnSyncFinishedCH() <-chan error {
}
func (s *syncHandler) launch(service *Service) {
service.eventProvider.Pause()
service.eventProvider.PauseIMAP()
labels := service.labels.GetLabelMap()

View File

@ -55,6 +55,7 @@ type Service struct {
timer *proton.Ticker
eventTimeout time.Duration
paused uint32
pausedIMAP uint32
panicHandler async.PanicHandler
subscriberList eventSubscriberList
@ -87,6 +88,7 @@ func NewService(
eventPublisher: eventPublisher,
timer: proton.NewTicker(pollPeriod, jitter, panicHandler),
paused: 1,
pausedIMAP: 1,
eventTimeout: eventTimeout,
panicHandler: panicHandler,
}
@ -117,6 +119,12 @@ func (s *Service) Pause() {
atomic.StoreUint32(&s.paused, 1)
}
// PauseIMAP special handler for the IMAP Service - Do Not Use.
func (s *Service) PauseIMAP() {
s.log.Info("Pausing from IMAP")
atomic.StoreUint32(&s.pausedIMAP, 1)
}
// PauseWithWaiter pauses the event polling and returns a waiter to notify when the last event has been published
// after the pause request.
func (s *Service) PauseWithWaiter() *EventPollWaiter {
@ -137,9 +145,18 @@ func (s *Service) Resume() {
atomic.StoreUint32(&s.paused, 0)
}
// ResumeIMAP special handler for the IMAP Service - Do Not Use.
func (s *Service) ResumeIMAP() {
s.log.Info("Resuming from IMAP")
atomic.StoreUint32(&s.pausedIMAP, 0)
}
// IsPaused return true if the service is paused.
func (s *Service) IsPaused() bool {
return atomic.LoadUint32(&s.paused) == 1
// We need to check both IMAP and service paused conditions here to determine if the service is
// paused. There can be instances where the sync from IMAP can overwrite a previous request to pause the loop once
// it is finished. To be addressed as part of GODT-2848.
return atomic.LoadUint32(&s.paused) == 1 || atomic.LoadUint32(&s.pausedIMAP) == 1
}
func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) error {

View File

@ -78,6 +78,7 @@ func TestService_EventIDLoadStore(t *testing.T) {
)
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
service.ResumeIMAP()
group.Wait()
}
@ -132,6 +133,7 @@ func TestService_RetryEventOnNonCatastrophicFailure(t *testing.T) {
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
service.ResumeIMAP()
group.Wait()
}
@ -194,6 +196,7 @@ func TestService_OnBadEventServiceIsPaused(t *testing.T) {
service.Subscribe(NewCallbackSubscriber("foo", EventHandler{MessageHandler: subscriber}))
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
service.ResumeIMAP()
group.Wait()
}
@ -250,6 +253,7 @@ func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T
service.Subscribe(subscription)
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
service.ResumeIMAP()
group.Wait()
}
@ -308,6 +312,7 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T
service.Subscribe(subscription)
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
service.ResumeIMAP()
group.Wait()
}
@ -370,6 +375,7 @@ func TestService_WaitOnEventPublishAfterPause(t *testing.T) {
service.Subscribe(NewCallbackSubscriber("foo", EventHandler{MessageHandler: subscriber}))
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
service.ResumeIMAP()
group.Wait()
}

View File

@ -296,6 +296,8 @@ func newImpl(
return user, fmt.Errorf("failed to start imap service: %w", err)
}
user.eventService.Resume()
return user, nil
}
@ -411,6 +413,8 @@ func (user *User) BadEventFeedbackResync(ctx context.Context) error {
return fmt.Errorf("failed to resync imap service: %w", err)
}
user.eventService.Resume()
return nil
}
@ -534,6 +538,8 @@ func (user *User) CheckAuth(email string, password []byte) (string, error) {
func (user *User) OnStatusUp(ctx context.Context) {
user.log.Info("Connection is up")
user.eventService.Resume()
if err := user.imapService.ResumeSync(ctx); err != nil {
user.log.WithError(err).Error("Failed to resume sync")
}