fix(GODT-2822): Update smtp and events service to use ordered cancel

This commit is contained in:
Leander Beernaert
2023-07-28 14:55:30 +02:00
parent 0048767022
commit 11ebb16933
3 changed files with 38 additions and 27 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/ProtonMail/gluon/reporter"
"github.com/ProtonMail/go-proton-api"
bridgelogging "github.com/ProtonMail/proton-bridge/v3/internal/logging"
"github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks"
"github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder"
"github.com/ProtonMail/proton-bridge/v3/internal/services/userevents"
"github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity"
@ -122,6 +123,12 @@ func (s *Service) SetAddressMode(ctx context.Context, mode usertypes.AddressMode
return err
}
func (s *Service) Resync(ctx context.Context) error {
_, err := s.cpc.Send(ctx, &resyncReq{})
return err
}
func (s *Service) checkAuth(ctx context.Context, email string, password []byte) (string, error) {
return cpc.SendTyped[string](ctx, s.cpc, &checkAuthReq{
email: email,
@ -129,9 +136,9 @@ func (s *Service) checkAuth(ctx context.Context, email string, password []byte)
})
}
func (s *Service) Start(group *async.Group) {
func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) {
s.log.Debug("Starting service")
group.Once(func(ctx context.Context) {
group.Go(ctx, s.userID, "smtp-service", func(ctx context.Context) {
logging.DoAnnotated(ctx, func(ctx context.Context) {
s.run(ctx)
}, logging.Labels{
@ -146,8 +153,8 @@ func (s *Service) UserID() string {
}
func (s *Service) run(ctx context.Context) {
s.log.Debug("Starting service main loop")
defer s.log.Debug("Exiting service main loop")
s.log.Info("Starting service main loop")
defer s.log.Info("Exiting service main loop")
defer s.cpc.Close()
subscription := userevents.Subscription{
@ -185,6 +192,10 @@ func (s *Service) run(ctx context.Context) {
addrID, err := s.identityState.CheckAuth(r.email, r.password, s.bridgePassProvider, s.telemetry)
request.Reply(ctx, addrID, err)
case *resyncReq:
err := s.identityState.OnRefreshEvent(ctx)
request.Reply(ctx, nil, err)
default:
s.log.Error("Received unknown request")
}
@ -249,3 +260,5 @@ type checkAuthReq struct {
email string
password []byte
}
type resyncReq struct{}

View File

@ -32,6 +32,7 @@ import (
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/proton-bridge/v3/internal"
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks"
"github.com/bradenaw/juniper/xslices"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
@ -176,12 +177,12 @@ func (s *Service) Resume() {
atomic.StoreUint32(&s.paused, 0)
}
// IsPaused return true if the service is paused
// IsPaused return true if the service is paused.
func (s *Service) IsPaused() bool {
return atomic.LoadUint32(&s.paused) == 1
}
func (s *Service) Start(ctx context.Context, group *async.Group) error {
func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) error {
lastEventID, err := s.eventIDStore.Load(ctx)
if err != nil {
return fmt.Errorf("failed to load last event id: %w", err)
@ -201,7 +202,7 @@ func (s *Service) Start(ctx context.Context, group *async.Group) error {
lastEventID = eventID
}
group.Once(func(ctx context.Context) {
group.Go(ctx, s.userID, "event-service", func(ctx context.Context) {
s.run(ctx, lastEventID)
})
@ -209,9 +210,10 @@ func (s *Service) Start(ctx context.Context, group *async.Group) error {
}
func (s *Service) run(ctx context.Context, lastEventID string) {
s.log.Debugf("Starting service Last EventID=%v", lastEventID)
s.log.Infof("Starting service Last EventID=%v", lastEventID)
defer s.close()
defer s.log.Debug("Exiting service")
defer s.log.Info("Exiting service")
defer s.Close()
for {
select {
@ -491,9 +493,3 @@ func (s *Service) removeSubscription(subscription Subscription) {
func (s *Service) close() {
s.timer.Stop()
}
type pauseRequest struct{}
type resumeRequest struct{}
type isPausedRequest struct{}

View File

@ -27,6 +27,7 @@ import (
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/proton-bridge/v3/internal/events"
mocks2 "github.com/ProtonMail/proton-bridge/v3/internal/events/mocks"
"github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks"
"github.com/ProtonMail/proton-bridge/v3/internal/services/userevents/mocks"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
@ -40,7 +41,7 @@ func TestService_EventIDLoadStore(t *testing.T) {
// * Start event poll loop
// * Get new event id, store it in vault
// * Try to poll new event it, but context is cancelled
group := async.NewGroup(context.Background(), &async.NoopPanicHandler{})
group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{})
mockCtrl := gomock.NewController(t)
eventPublisher := mocks2.NewMockEventPublisher(mockCtrl)
eventIDStore := mocks.NewMockEventIDStore(mockCtrl)
@ -68,11 +69,11 @@ func TestService_EventIDLoadStore(t *testing.T) {
service := NewService("foo", eventSource, eventIDStore, eventPublisher, 1*time.Millisecond, time.Second, async.NoopPanicHandler{})
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
group.WaitToFinish()
group.Wait()
}
func TestService_RetryEventOnNonCatastrophicFailure(t *testing.T) {
group := async.NewGroup(context.Background(), &async.NoopPanicHandler{})
group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{})
mockCtrl := gomock.NewController(t)
eventPublisher := mocks2.NewMockEventPublisher(mockCtrl)
eventIDStore := mocks.NewMockEventIDStore(mockCtrl)
@ -114,11 +115,11 @@ func TestService_RetryEventOnNonCatastrophicFailure(t *testing.T) {
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
group.WaitToFinish()
group.Wait()
}
func TestService_OnBadEventServiceIsPaused(t *testing.T) {
group := async.NewGroup(context.Background(), &async.NoopPanicHandler{})
group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{})
mockCtrl := gomock.NewController(t)
eventPublisher := mocks2.NewMockEventPublisher(mockCtrl)
eventIDStore := mocks.NewMockEventIDStore(mockCtrl)
@ -158,7 +159,7 @@ func TestService_OnBadEventServiceIsPaused(t *testing.T) {
EventInfo: secondEvent[0].String(),
Error: badEventErr,
}).Do(func(_ context.Context, event events.Event) {
group.Once(func(_ context.Context) {
group.Go(context.Background(), "", "", func(_ context.Context) {
// Use background context to avoid having the request cancelled
require.True(t, service.IsPaused())
group.Cancel()
@ -168,11 +169,11 @@ func TestService_OnBadEventServiceIsPaused(t *testing.T) {
service.Subscribe(Subscription{Messages: subscriber})
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
group.WaitToFinish()
group.Wait()
}
func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T) {
group := async.NewGroup(context.Background(), &async.NoopPanicHandler{})
group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{})
mockCtrl := gomock.NewController(t)
eventPublisher := mocks2.NewMockEventPublisher(mockCtrl)
eventIDStore := mocks.NewMockEventIDStore(mockCtrl)
@ -207,6 +208,7 @@ func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T
// Subscriber expectations.
subscriber.EXPECT().name().AnyTimes().Return("Foo")
subscriber.EXPECT().cancel().Times(1)
subscriber.EXPECT().close().Times(1)
subscriber.EXPECT().handle(gomock.Any(), gomock.Eq(messageEvents)).Times(1).DoAndReturn(func(_ context.Context, _ []proton.MessageEvent) error {
service.Unsubscribe(Subscription{Messages: subscriber})
return nil
@ -215,11 +217,11 @@ func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T
service.Subscribe(Subscription{Messages: subscriber})
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
group.WaitToFinish()
group.Wait()
}
func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T) {
group := async.NewGroup(context.Background(), &async.NoopPanicHandler{})
group := orderedtasks.NewOrderedCancelGroup(async.NoopPanicHandler{})
mockCtrl := gomock.NewController(t)
eventPublisher := mocks2.NewMockEventPublisher(mockCtrl)
eventIDStore := mocks.NewMockEventIDStore(mockCtrl)
@ -253,7 +255,7 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T
service := NewService("foo", eventSource, eventIDStore, eventPublisher, 1*time.Millisecond, time.Second, async.NoopPanicHandler{})
// start subscriber
group.Once(func(_ context.Context) {
group.Go(context.Background(), "", "", func(_ context.Context) {
defer service.Unsubscribe(Subscription{Messages: subscriber})
// Simulate the reception of an event, but it is never handled due to unexpected exit
@ -263,5 +265,5 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T
service.Subscribe(Subscription{Messages: subscriber})
require.NoError(t, service.Start(context.Background(), group))
service.Resume()
group.WaitToFinish()
group.Wait()
}