From 11ebb169331b9673692e5d57f3eee78fd69d9b9a Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Fri, 28 Jul 2023 14:55:30 +0200 Subject: [PATCH] fix(GODT-2822): Update smtp and events service to use ordered cancel --- internal/services/smtp/service.go | 21 +++++++++++++--- internal/services/userevents/service.go | 18 ++++++-------- internal/services/userevents/service_test.go | 26 +++++++++++--------- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/internal/services/smtp/service.go b/internal/services/smtp/service.go index d1415312..29a571da 100644 --- a/internal/services/smtp/service.go +++ b/internal/services/smtp/service.go @@ -28,6 +28,7 @@ import ( "github.com/ProtonMail/gluon/reporter" "github.com/ProtonMail/go-proton-api" bridgelogging "github.com/ProtonMail/proton-bridge/v3/internal/logging" + "github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks" "github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder" "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents" "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity" @@ -122,6 +123,12 @@ func (s *Service) SetAddressMode(ctx context.Context, mode usertypes.AddressMode return err } +func (s *Service) Resync(ctx context.Context) error { + _, err := s.cpc.Send(ctx, &resyncReq{}) + + return err +} + func (s *Service) checkAuth(ctx context.Context, email string, password []byte) (string, error) { return cpc.SendTyped[string](ctx, s.cpc, &checkAuthReq{ email: email, @@ -129,9 +136,9 @@ func (s *Service) checkAuth(ctx context.Context, email string, password []byte) }) } -func (s *Service) Start(group *async.Group) { +func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) { s.log.Debug("Starting service") - group.Once(func(ctx context.Context) { + group.Go(ctx, s.userID, "smtp-service", func(ctx context.Context) { logging.DoAnnotated(ctx, func(ctx context.Context) { s.run(ctx) }, logging.Labels{ @@ -146,8 +153,8 @@ func (s *Service) UserID() string { } func (s *Service) run(ctx context.Context) { - s.log.Debug("Starting service main loop") - defer s.log.Debug("Exiting service main loop") + s.log.Info("Starting service main loop") + defer s.log.Info("Exiting service main loop") defer s.cpc.Close() subscription := userevents.Subscription{ @@ -185,6 +192,10 @@ func (s *Service) run(ctx context.Context) { addrID, err := s.identityState.CheckAuth(r.email, r.password, s.bridgePassProvider, s.telemetry) request.Reply(ctx, addrID, err) + case *resyncReq: + err := s.identityState.OnRefreshEvent(ctx) + request.Reply(ctx, nil, err) + default: s.log.Error("Received unknown request") } @@ -249,3 +260,5 @@ type checkAuthReq struct { email string password []byte } + +type resyncReq struct{} diff --git a/internal/services/userevents/service.go b/internal/services/userevents/service.go index 8f048136..8640c135 100644 --- a/internal/services/userevents/service.go +++ b/internal/services/userevents/service.go @@ -32,6 +32,7 @@ import ( "github.com/ProtonMail/go-proton-api" "github.com/ProtonMail/proton-bridge/v3/internal" "github.com/ProtonMail/proton-bridge/v3/internal/events" + "github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks" "github.com/bradenaw/juniper/xslices" "github.com/sirupsen/logrus" "golang.org/x/exp/slices" @@ -176,12 +177,12 @@ func (s *Service) Resume() { atomic.StoreUint32(&s.paused, 0) } -// IsPaused return true if the service is paused +// IsPaused return true if the service is paused. func (s *Service) IsPaused() bool { return atomic.LoadUint32(&s.paused) == 1 } -func (s *Service) Start(ctx context.Context, group *async.Group) error { +func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) error { lastEventID, err := s.eventIDStore.Load(ctx) if err != nil { return fmt.Errorf("failed to load last event id: %w", err) @@ -201,7 +202,7 @@ func (s *Service) Start(ctx context.Context, group *async.Group) error { lastEventID = eventID } - group.Once(func(ctx context.Context) { + group.Go(ctx, s.userID, "event-service", func(ctx context.Context) { s.run(ctx, lastEventID) }) @@ -209,9 +210,10 @@ func (s *Service) Start(ctx context.Context, group *async.Group) error { } func (s *Service) run(ctx context.Context, lastEventID string) { - s.log.Debugf("Starting service Last EventID=%v", lastEventID) + s.log.Infof("Starting service Last EventID=%v", lastEventID) defer s.close() - defer s.log.Debug("Exiting service") + defer s.log.Info("Exiting service") + defer s.Close() for { select { @@ -491,9 +493,3 @@ func (s *Service) removeSubscription(subscription Subscription) { func (s *Service) close() { s.timer.Stop() } - -type pauseRequest struct{} - -type resumeRequest struct{} - -type isPausedRequest struct{} diff --git a/internal/services/userevents/service_test.go b/internal/services/userevents/service_test.go index 32d67131..f40316f2 100644 --- a/internal/services/userevents/service_test.go +++ b/internal/services/userevents/service_test.go @@ -27,6 +27,7 @@ import ( "github.com/ProtonMail/go-proton-api" "github.com/ProtonMail/proton-bridge/v3/internal/events" mocks2 "github.com/ProtonMail/proton-bridge/v3/internal/events/mocks" + "github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks" "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents/mocks" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -40,7 +41,7 @@ func TestService_EventIDLoadStore(t *testing.T) { // * Start event poll loop // * Get new event id, store it in vault // * Try to poll new event it, but context is cancelled - group := async.NewGroup(context.Background(), &async.NoopPanicHandler{}) + group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{}) mockCtrl := gomock.NewController(t) eventPublisher := mocks2.NewMockEventPublisher(mockCtrl) eventIDStore := mocks.NewMockEventIDStore(mockCtrl) @@ -68,11 +69,11 @@ func TestService_EventIDLoadStore(t *testing.T) { service := NewService("foo", eventSource, eventIDStore, eventPublisher, 1*time.Millisecond, time.Second, async.NoopPanicHandler{}) require.NoError(t, service.Start(context.Background(), group)) service.Resume() - group.WaitToFinish() + group.Wait() } func TestService_RetryEventOnNonCatastrophicFailure(t *testing.T) { - group := async.NewGroup(context.Background(), &async.NoopPanicHandler{}) + group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{}) mockCtrl := gomock.NewController(t) eventPublisher := mocks2.NewMockEventPublisher(mockCtrl) eventIDStore := mocks.NewMockEventIDStore(mockCtrl) @@ -114,11 +115,11 @@ func TestService_RetryEventOnNonCatastrophicFailure(t *testing.T) { require.NoError(t, service.Start(context.Background(), group)) service.Resume() - group.WaitToFinish() + group.Wait() } func TestService_OnBadEventServiceIsPaused(t *testing.T) { - group := async.NewGroup(context.Background(), &async.NoopPanicHandler{}) + group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{}) mockCtrl := gomock.NewController(t) eventPublisher := mocks2.NewMockEventPublisher(mockCtrl) eventIDStore := mocks.NewMockEventIDStore(mockCtrl) @@ -158,7 +159,7 @@ func TestService_OnBadEventServiceIsPaused(t *testing.T) { EventInfo: secondEvent[0].String(), Error: badEventErr, }).Do(func(_ context.Context, event events.Event) { - group.Once(func(_ context.Context) { + group.Go(context.Background(), "", "", func(_ context.Context) { // Use background context to avoid having the request cancelled require.True(t, service.IsPaused()) group.Cancel() @@ -168,11 +169,11 @@ func TestService_OnBadEventServiceIsPaused(t *testing.T) { service.Subscribe(Subscription{Messages: subscriber}) require.NoError(t, service.Start(context.Background(), group)) service.Resume() - group.WaitToFinish() + group.Wait() } func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T) { - group := async.NewGroup(context.Background(), &async.NoopPanicHandler{}) + group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{}) mockCtrl := gomock.NewController(t) eventPublisher := mocks2.NewMockEventPublisher(mockCtrl) eventIDStore := mocks.NewMockEventIDStore(mockCtrl) @@ -207,6 +208,7 @@ func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T // Subscriber expectations. subscriber.EXPECT().name().AnyTimes().Return("Foo") subscriber.EXPECT().cancel().Times(1) + subscriber.EXPECT().close().Times(1) subscriber.EXPECT().handle(gomock.Any(), gomock.Eq(messageEvents)).Times(1).DoAndReturn(func(_ context.Context, _ []proton.MessageEvent) error { service.Unsubscribe(Subscription{Messages: subscriber}) return nil @@ -215,11 +217,11 @@ func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T service.Subscribe(Subscription{Messages: subscriber}) require.NoError(t, service.Start(context.Background(), group)) service.Resume() - group.WaitToFinish() + group.Wait() } func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T) { - group := async.NewGroup(context.Background(), &async.NoopPanicHandler{}) + group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{}) mockCtrl := gomock.NewController(t) eventPublisher := mocks2.NewMockEventPublisher(mockCtrl) eventIDStore := mocks.NewMockEventIDStore(mockCtrl) @@ -253,7 +255,7 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T service := NewService("foo", eventSource, eventIDStore, eventPublisher, 1*time.Millisecond, time.Second, async.NoopPanicHandler{}) // start subscriber - group.Once(func(_ context.Context) { + group.Go(context.Background(), "", "", func(_ context.Context) { defer service.Unsubscribe(Subscription{Messages: subscriber}) // Simulate the reception of an event, but it is never handled due to unexpected exit @@ -263,5 +265,5 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T service.Subscribe(Subscription{Messages: subscriber}) require.NoError(t, service.Start(context.Background(), group)) service.Resume() - group.WaitToFinish() + group.Wait() }