diff --git a/internal/services/imapservice/service.go b/internal/services/imapservice/service.go index 0b14f132..168434a2 100644 --- a/internal/services/imapservice/service.go +++ b/internal/services/imapservice/service.go @@ -38,7 +38,8 @@ import ( type EventProvider interface { userevents.Subscribable - userevents.EventController + PauseIMAP() + ResumeIMAP() } type Telemetry interface { @@ -362,7 +363,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo } s.log.Info("Sync complete, starting API event stream") - s.eventProvider.Resume() + s.eventProvider.ResumeIMAP() } case update, ok := <-s.syncHandler.updater.ch: diff --git a/internal/services/imapservice/service_sync.go b/internal/services/imapservice/service_sync.go index 8c21952b..d41acdbb 100644 --- a/internal/services/imapservice/service_sync.go +++ b/internal/services/imapservice/service_sync.go @@ -93,7 +93,7 @@ func (s *syncHandler) OnSyncFinishedCH() <-chan error { } func (s *syncHandler) launch(service *Service) { - service.eventProvider.Pause() + service.eventProvider.PauseIMAP() labels := service.labels.GetLabelMap() diff --git a/internal/services/userevents/service.go b/internal/services/userevents/service.go index 398290e7..bc8eaaab 100644 --- a/internal/services/userevents/service.go +++ b/internal/services/userevents/service.go @@ -55,6 +55,7 @@ type Service struct { timer *proton.Ticker eventTimeout time.Duration paused uint32 + pausedIMAP uint32 panicHandler async.PanicHandler subscriberList eventSubscriberList @@ -87,6 +88,7 @@ func NewService( eventPublisher: eventPublisher, timer: proton.NewTicker(pollPeriod, jitter, panicHandler), paused: 1, + pausedIMAP: 1, eventTimeout: eventTimeout, panicHandler: panicHandler, } @@ -117,6 +119,12 @@ func (s *Service) Pause() { atomic.StoreUint32(&s.paused, 1) } +// PauseIMAP special handler for the IMAP Service - Do Not Use. +func (s *Service) PauseIMAP() { + s.log.Info("Pausing from IMAP") + atomic.StoreUint32(&s.pausedIMAP, 1) +} + // PauseWithWaiter pauses the event polling and returns a waiter to notify when the last event has been published // after the pause request. func (s *Service) PauseWithWaiter() *EventPollWaiter { @@ -137,9 +145,18 @@ func (s *Service) Resume() { atomic.StoreUint32(&s.paused, 0) } +// ResumeIMAP special handler for the IMAP Service - Do Not Use. +func (s *Service) ResumeIMAP() { + s.log.Info("Resuming from IMAP") + atomic.StoreUint32(&s.pausedIMAP, 0) +} + // IsPaused return true if the service is paused. func (s *Service) IsPaused() bool { - return atomic.LoadUint32(&s.paused) == 1 + // We need to check both IMAP and service paused conditions here to determine if the service is + // paused. There can be instances where the sync from IMAP can overwrite a previous request to pause the loop once + // it is finished. To be addressed as part of GODT-2848. + return atomic.LoadUint32(&s.paused) == 1 || atomic.LoadUint32(&s.pausedIMAP) == 1 } func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) error { diff --git a/internal/services/userevents/service_test.go b/internal/services/userevents/service_test.go index fae69d50..6f95b1e8 100644 --- a/internal/services/userevents/service_test.go +++ b/internal/services/userevents/service_test.go @@ -78,6 +78,7 @@ func TestService_EventIDLoadStore(t *testing.T) { ) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -132,6 +133,7 @@ func TestService_RetryEventOnNonCatastrophicFailure(t *testing.T) { require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -194,6 +196,7 @@ func TestService_OnBadEventServiceIsPaused(t *testing.T) { service.Subscribe(NewCallbackSubscriber("foo", EventHandler{MessageHandler: subscriber})) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -250,6 +253,7 @@ func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T service.Subscribe(subscription) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -308,6 +312,7 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T service.Subscribe(subscription) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } @@ -370,6 +375,7 @@ func TestService_WaitOnEventPublishAfterPause(t *testing.T) { service.Subscribe(NewCallbackSubscriber("foo", EventHandler{MessageHandler: subscriber})) require.NoError(t, service.Start(context.Background(), group)) service.Resume() + service.ResumeIMAP() group.Wait() } diff --git a/internal/user/user.go b/internal/user/user.go index 2d6f9d02..b7694e35 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -296,6 +296,8 @@ func newImpl( return user, fmt.Errorf("failed to start imap service: %w", err) } + user.eventService.Resume() + return user, nil } @@ -411,6 +413,8 @@ func (user *User) BadEventFeedbackResync(ctx context.Context) error { return fmt.Errorf("failed to resync imap service: %w", err) } + user.eventService.Resume() + return nil } @@ -534,6 +538,8 @@ func (user *User) CheckAuth(email string, password []byte) (string, error) { func (user *User) OnStatusUp(ctx context.Context) { user.log.Info("Connection is up") + user.eventService.Resume() + if err := user.imapService.ResumeSync(ctx); err != nil { user.log.WithError(err).Error("Failed to resume sync") }