From 24331f971594c800a9ed2b187c289262ebf17718 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Fri, 11 Aug 2023 11:23:28 +0200 Subject: [PATCH] fix(GODT-2803): Separate conditions to pause event loop for IMAP Add two separate toggles to control event loop pausing. This is required to prevent cases where the bridge requests the event loop to be paused but a sync process completes and resumes the event loop. For the loop to resume now both states need to be set to false. This will be removed once GODT-2848 is implemented. --- internal/services/imapservice/service.go | 5 +++-- internal/services/imapservice/service_sync.go | 2 +- internal/services/userevents/service.go | 19 ++++++++++++++++++- internal/services/userevents/service_test.go | 6 ++++++ internal/user/user.go | 6 ++++++ 5 files changed, 34 insertions(+), 4 deletions(-) diff --git a/internal/services/imapservice/service.go b/internal/services/imapservice/service.go index 0b14f132..168434a2 100644 --- a/internal/services/imapservice/service.go +++ b/internal/services/imapservice/service.go @@ -38,7 +38,8 @@ import ( type EventProvider interface { userevents.Subscribable - userevents.EventController + PauseIMAP() + ResumeIMAP() } type Telemetry interface { @@ -362,7 +363,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo } s.log.Info("Sync complete, starting API event stream") - s.eventProvider.Resume() + s.eventProvider.ResumeIMAP() } case update, ok := <-s.syncHandler.updater.ch: diff --git a/internal/services/imapservice/service_sync.go b/internal/services/imapservice/service_sync.go index 8c21952b..d41acdbb 100644 --- a/internal/services/imapservice/service_sync.go +++ b/internal/services/imapservice/service_sync.go @@ -93,7 +93,7 @@ func (s *syncHandler) OnSyncFinishedCH() <-chan error { } func (s *syncHandler) launch(service *Service) { - service.eventProvider.Pause() + service.eventProvider.PauseIMAP() labels := service.labels.GetLabelMap() diff --git a/internal/services/userevents/service.go b/internal/services/userevents/service.go index 398290e7..bc8eaaab 100644 --- a/internal/services/userevents/service.go +++ b/internal/services/userevents/service.go @@ -55,6 +55,7 @@ type Service struct { timer *proton.Ticker eventTimeout time.Duration paused uint32 + pausedIMAP uint32 panicHandler async.PanicHandler subscriberList eventSubscriberList @@ -87,6 +88,7 @@ func NewService( eventPublisher: eventPublisher, timer: proton.NewTicker(pollPeriod, jitter, panicHandler), paused: 1, + pausedIMAP: 1, eventTimeout: eventTimeout, panicHandler: panicHandler, } @@ -117,6 +119,12 @@ func (s *Service) Pause() { atomic.StoreUint32(&s.paused, 1) } +// PauseIMAP special handler for the IMAP Service - Do Not Use. +func (s *Service) PauseIMAP() { + s.log.Info("Pausing from IMAP") + atomic.StoreUint32(&s.pausedIMAP, 1) +} + // PauseWithWaiter pauses the event polling and returns a waiter to notify when the last event has been published // after the pause request. func (s *Service) PauseWithWaiter() *EventPollWaiter { @@ -137,9 +145,18 @@ func (s *Service) Resume() { atomic.StoreUint32(&s.paused, 0) } +// ResumeIMAP special handler for the IMAP Service - Do Not Use. +func (s *Service) ResumeIMAP() { + s.log.Info("Resuming from IMAP") + atomic.StoreUint32(&s.pausedIMAP, 0) +} + // IsPaused return true if the service is paused. func (s *Service) IsPaused() bool { - return atomic.LoadUint32(&s.paused) == 1 + // We need to check both IMAP and service paused conditions here to determine if the service is + // paused. There can be instances where the sync from IMAP can overwrite a previous request to pause the loop once + // it is finished. To be addressed as part of GODT-2848. + return atomic.LoadUint32(&s.paused) == 1 || atomic.LoadUint32(&s.pausedIMAP) == 1 } func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) error { diff --git a/internal/services/userevents/service_test.go b/internal/services/userevents/service_test.go index fae69d50..6f95b1e8 100644 --- a/internal/services/userevents/service_test.go +++ b/internal/services/userevents/service_test.go @@ -78,6 +78,7 @@ func TestService_EventIDLoadStore(t *testing.T) { ) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -132,6 +133,7 @@ func TestService_RetryEventOnNonCatastrophicFailure(t *testing.T) { require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -194,6 +196,7 @@ func TestService_OnBadEventServiceIsPaused(t *testing.T) { service.Subscribe(NewCallbackSubscriber("foo", EventHandler{MessageHandler: subscriber})) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -250,6 +253,7 @@ func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T service.Subscribe(subscription) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -308,6 +312,7 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T service.Subscribe(subscription) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -370,6 +375,7 @@ func TestService_WaitOnEventPublishAfterPause(t *testing.T) { service.Subscribe(NewCallbackSubscriber("foo", EventHandler{MessageHandler: subscriber})) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } diff --git a/internal/user/user.go b/internal/user/user.go index 2d6f9d02..b7694e35 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -296,6 +296,8 @@ func newImpl( return user, fmt.Errorf("failed to start imap service: %w", err) } + user.eventService.Resume() + return user, nil } @@ -411,6 +413,8 @@ func (user *User) BadEventFeedbackResync(ctx context.Context) error { return fmt.Errorf("failed to resync imap service: %w", err) } + user.eventService.Resume() + return nil } @@ -534,6 +538,8 @@ func (user *User) CheckAuth(email string, password []byte) (string, error) { func (user *User) OnStatusUp(ctx context.Context) { user.log.Info("Connection is up") + user.eventService.Resume() + if err := user.imapService.ResumeSync(ctx); err != nil { user.log.WithError(err).Error("Failed to resume sync") }