feat(GODT-2803): Add Event Poll Waiter

Wait for the current event poll to finish publishing events after a
request to pause the event loop. This is required to change the gluon
cache directory.
This commit is contained in:
Leander Beernaert
2023-08-11 09:55:21 +02:00
parent a46533dcf2
commit a5500629e5
3 changed files with 154 additions and 0 deletions

View File

@ -0,0 +1,58 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package userevents
import (
"context"
"time"
)
// EventPollWaiter is meant to be used to wait for the event loop to finish processing the current events after
// being paused.
type EventPollWaiter struct {
ch chan struct{}
}
func newEventPollWaiter() *EventPollWaiter {
return &EventPollWaiter{ch: make(chan struct{})}
}
func (e *EventPollWaiter) WaitPollFinished(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-e.ch:
return nil
}
}
func (e *EventPollWaiter) WaitPollFinishedWithDeadline(ctx context.Context, t time.Time) error {
ctx, cancel := context.WithDeadline(ctx, t)
defer cancel()
select {
case <-ctx.Done():
return ctx.Err()
case <-e.ch:
return nil
}
}
func (e *EventPollWaiter) close() {
close(e.ch)
}

View File

@ -61,6 +61,9 @@ type Service struct {
pendingSubscriptionsLock sync.Mutex
pendingSubscriptions []pendingSubscription
eventPollWaiters []*EventPollWaiter
eventPollWaitersLock sync.Mutex
}
func NewService(
@ -114,6 +117,21 @@ func (s *Service) Pause() {
atomic.StoreUint32(&s.paused, 1)
}
// PauseWithWaiter pauses the event polling and returns a waiter to notify when the last event has been published
// after the pause request.
func (s *Service) PauseWithWaiter() *EventPollWaiter {
s.log.Info("Pausing")
atomic.StoreUint32(&s.paused, 1)
waiter := newEventPollWaiter()
s.eventPollWaitersLock.Lock()
s.eventPollWaiters = append(s.eventPollWaiters, waiter)
s.eventPollWaitersLock.Unlock()
return waiter
}
// Resume resumes the event polling.
func (s *Service) Resume() {
atomic.StoreUint32(&s.paused, 0)
@ -163,6 +181,7 @@ func (s *Service) run(ctx context.Context, lastEventID string) {
return
case <-s.timer.C:
if s.IsPaused() {
s.closePollWaiters()
continue
}
}
@ -223,6 +242,10 @@ func (s *Service) run(ctx context.Context, lastEventID string) {
}
lastEventID = newEventID
if s.IsPaused() {
s.closePollWaiters()
}
}
}
@ -249,6 +272,17 @@ func (s *Service) Close() {
s.pendingSubscriptions = nil
}
func (s *Service) closePollWaiters() {
s.eventPollWaitersLock.Lock()
defer s.eventPollWaitersLock.Unlock()
for _, v := range s.eventPollWaiters {
v.close()
}
s.eventPollWaiters = nil
}
func (s *Service) handleEvent(ctx context.Context, lastEventID string, event proton.Event) error {
s.log.WithFields(logrus.Fields{
"old": lastEventID,

View File

@ -311,6 +311,68 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T
group.Wait()
}
func TestService_WaitOnEventPublishAfterPause(t *testing.T) {
group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{})
mockCtrl := gomock.NewController(t)
eventPublisher := mocks2.NewMockEventPublisher(mockCtrl)
eventIDStore := mocks.NewMockEventIDStore(mockCtrl)
eventSource := mocks.NewMockEventSource(mockCtrl)
subscriber := NewMockMessageEventHandler(mockCtrl)
firstEventID := "EVENT01"
secondEventID := "EVENT02"
messageEvents := []proton.MessageEvent{
{
EventItem: proton.EventItem{ID: "Message"},
},
}
secondEvent := []proton.Event{{
EventID: secondEventID,
Messages: messageEvents,
}}
// Event id store expectations.
eventIDStore.EXPECT().Load(gomock.Any()).Times(1).Return(firstEventID, nil)
eventIDStore.EXPECT().Store(gomock.Any(), gomock.Eq(secondEventID)).Times(1).Return(nil)
// Event Source expectations.
eventSource.EXPECT().GetEvent(gomock.Any(), gomock.Eq(firstEventID)).MinTimes(1).Return(secondEvent, false, nil)
// Subscriber expectations.
service := NewService(
"foo",
eventSource,
eventIDStore,
eventPublisher,
time.Millisecond,
time.Millisecond,
time.Second,
async.NoopPanicHandler{},
)
subscriber.EXPECT().HandleMessageEvents(gomock.Any(), gomock.Eq(messageEvents)).Times(1).DoAndReturn(func(_ context.Context, _ []proton.MessageEvent) error {
waiter := service.PauseWithWaiter()
go func() {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
err := waiter.WaitPollFinished(ctx)
require.NoError(t, err)
group.Cancel()
}()
return nil
})
service.Subscribe(NewCallbackSubscriber("foo", EventHandler{MessageHandler: subscriber}))
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
group.Wait()
}
type CallbackSubscriber struct {
handler EventHandler
n string