From a5500629e51f883cc094692be6db61d4143de4a1 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Fri, 11 Aug 2023 09:55:21 +0200 Subject: [PATCH] feat(GODT-2803): Add Event Poll Waiter Wait for the current event poll to finish publishing events after a request to pause the event loop. This is required to change the gluon cache directory. --- .../services/userevents/event_poll_waiter.go | 58 +++++++++++++++++ internal/services/userevents/service.go | 34 ++++++++++ internal/services/userevents/service_test.go | 62 +++++++++++++++++++ 3 files changed, 154 insertions(+) create mode 100644 internal/services/userevents/event_poll_waiter.go diff --git a/internal/services/userevents/event_poll_waiter.go b/internal/services/userevents/event_poll_waiter.go new file mode 100644 index 00000000..f63d4331 --- /dev/null +++ b/internal/services/userevents/event_poll_waiter.go @@ -0,0 +1,58 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package userevents + +import ( + "context" + "time" +) + +// EventPollWaiter is meant to be used to wait for the event loop to finish processing the current events after +// being paused. +type EventPollWaiter struct { + ch chan struct{} +} + +func newEventPollWaiter() *EventPollWaiter { + return &EventPollWaiter{ch: make(chan struct{})} +} + +func (e *EventPollWaiter) WaitPollFinished(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-e.ch: + return nil + } +} + +func (e *EventPollWaiter) WaitPollFinishedWithDeadline(ctx context.Context, t time.Time) error { + ctx, cancel := context.WithDeadline(ctx, t) + defer cancel() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-e.ch: + return nil + } +} + +func (e *EventPollWaiter) close() { + close(e.ch) +} diff --git a/internal/services/userevents/service.go b/internal/services/userevents/service.go index 471a4130..398290e7 100644 --- a/internal/services/userevents/service.go +++ b/internal/services/userevents/service.go @@ -61,6 +61,9 @@ type Service struct { pendingSubscriptionsLock sync.Mutex pendingSubscriptions []pendingSubscription + + eventPollWaiters []*EventPollWaiter + eventPollWaitersLock sync.Mutex } func NewService( @@ -114,6 +117,21 @@ func (s *Service) Pause() { atomic.StoreUint32(&s.paused, 1) } +// PauseWithWaiter pauses the event polling and returns a waiter to notify when the last event has been published +// after the pause request. +func (s *Service) PauseWithWaiter() *EventPollWaiter { + s.log.Info("Pausing") + atomic.StoreUint32(&s.paused, 1) + + waiter := newEventPollWaiter() + + s.eventPollWaitersLock.Lock() + s.eventPollWaiters = append(s.eventPollWaiters, waiter) + s.eventPollWaitersLock.Unlock() + + return waiter +} + // Resume resumes the event polling. func (s *Service) Resume() { atomic.StoreUint32(&s.paused, 0) @@ -163,6 +181,7 @@ func (s *Service) run(ctx context.Context, lastEventID string) { return case <-s.timer.C: if s.IsPaused() { + s.closePollWaiters() continue } } @@ -223,6 +242,10 @@ func (s *Service) run(ctx context.Context, lastEventID string) { } lastEventID = newEventID + + if s.IsPaused() { + s.closePollWaiters() + } } } @@ -249,6 +272,17 @@ func (s *Service) Close() { s.pendingSubscriptions = nil } +func (s *Service) closePollWaiters() { + s.eventPollWaitersLock.Lock() + defer s.eventPollWaitersLock.Unlock() + + for _, v := range s.eventPollWaiters { + v.close() + } + + s.eventPollWaiters = nil +} + func (s *Service) handleEvent(ctx context.Context, lastEventID string, event proton.Event) error { s.log.WithFields(logrus.Fields{ "old": lastEventID, diff --git a/internal/services/userevents/service_test.go b/internal/services/userevents/service_test.go index 802e7281..fae69d50 100644 --- a/internal/services/userevents/service_test.go +++ b/internal/services/userevents/service_test.go @@ -311,6 +311,68 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T group.Wait() } +func TestService_WaitOnEventPublishAfterPause(t *testing.T) { + group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{}) + mockCtrl := gomock.NewController(t) + eventPublisher := mocks2.NewMockEventPublisher(mockCtrl) + eventIDStore := mocks.NewMockEventIDStore(mockCtrl) + eventSource := mocks.NewMockEventSource(mockCtrl) + subscriber := NewMockMessageEventHandler(mockCtrl) + + firstEventID := "EVENT01" + secondEventID := "EVENT02" + messageEvents := []proton.MessageEvent{ + { + EventItem: proton.EventItem{ID: "Message"}, + }, + } + secondEvent := []proton.Event{{ + EventID: secondEventID, + Messages: messageEvents, + }} + + // Event id store expectations. + eventIDStore.EXPECT().Load(gomock.Any()).Times(1).Return(firstEventID, nil) + eventIDStore.EXPECT().Store(gomock.Any(), gomock.Eq(secondEventID)).Times(1).Return(nil) + + // Event Source expectations. + eventSource.EXPECT().GetEvent(gomock.Any(), gomock.Eq(firstEventID)).MinTimes(1).Return(secondEvent, false, nil) + + // Subscriber expectations. + + service := NewService( + "foo", + eventSource, + eventIDStore, + eventPublisher, + time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) + + subscriber.EXPECT().HandleMessageEvents(gomock.Any(), gomock.Eq(messageEvents)).Times(1).DoAndReturn(func(_ context.Context, _ []proton.MessageEvent) error { + waiter := service.PauseWithWaiter() + + go func() { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) + defer cancel() + + err := waiter.WaitPollFinished(ctx) + require.NoError(t, err) + + group.Cancel() + }() + + return nil + }) + + service.Subscribe(NewCallbackSubscriber("foo", EventHandler{MessageHandler: subscriber})) + require.NoError(t, service.Start(context.Background(), group)) + service.Resume() + group.Wait() +} + type CallbackSubscriber struct { handler EventHandler n string