forked from Silverfish/proton-bridge
fix(GODT-2909): Remove Timeout on event publish
While good intentioned, this change causes issues when the computer goes to sleep and a user resumes after the timeout interval.
This commit is contained in:
@ -338,7 +338,7 @@ func (s *Service) handleEvent(ctx context.Context, lastEventID string, event pro
|
|||||||
s.log.Info("Received refresh event")
|
s.log.Info("Received refresh event")
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.subscriberList.PublishParallel(ctx, event, s.panicHandler, s.eventTimeout)
|
return s.subscriberList.PublishParallel(ctx, event, s.panicHandler)
|
||||||
}
|
}
|
||||||
|
|
||||||
func unpackPublisherError(err error) (string, error) {
|
func unpackPublisherError(err error) (string, error) {
|
||||||
|
|||||||
@ -182,56 +182,3 @@ func TestServiceHandleEvent_CheckEventFailureCausesErrorParallel(t *testing.T) {
|
|||||||
require.True(t, errors.As(err, &publisherErr))
|
require.True(t, errors.As(err, &publisherErr))
|
||||||
require.Equal(t, publisherErr.subscriber, subscription)
|
require.Equal(t, publisherErr.subscriber, subscription)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServiceHandleEvent_SubscriberTimeout(t *testing.T) {
|
|
||||||
mockCtrl := gomock.NewController(t)
|
|
||||||
|
|
||||||
eventPublisher := mocks.NewMockEventPublisher(mockCtrl)
|
|
||||||
eventIDStore := NewInMemoryEventIDStore()
|
|
||||||
|
|
||||||
addressHandler := NewMockAddressEventHandler(mockCtrl)
|
|
||||||
addressHandler.EXPECT().HandleAddressEvents(gomock.Any(), gomock.Any()).MaxTimes(1).Return(nil)
|
|
||||||
|
|
||||||
addressHandler2 := NewMockAddressEventHandler(mockCtrl)
|
|
||||||
addressHandler2.EXPECT().HandleAddressEvents(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, _ []proton.AddressEvent) error {
|
|
||||||
timer := time.NewTimer(time.Second)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case <-timer.C:
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}).MaxTimes(1)
|
|
||||||
|
|
||||||
service := NewService(
|
|
||||||
"foo",
|
|
||||||
&NullEventSource{},
|
|
||||||
eventIDStore,
|
|
||||||
eventPublisher,
|
|
||||||
100*time.Millisecond,
|
|
||||||
time.Millisecond,
|
|
||||||
500*time.Millisecond,
|
|
||||||
async.NoopPanicHandler{},
|
|
||||||
)
|
|
||||||
|
|
||||||
subscription := NewCallbackSubscriber("test", EventHandler{
|
|
||||||
AddressHandler: addressHandler2,
|
|
||||||
})
|
|
||||||
|
|
||||||
service.addSubscription(subscription)
|
|
||||||
|
|
||||||
service.addSubscription(NewCallbackSubscriber("test2", EventHandler{
|
|
||||||
AddressHandler: addressHandler,
|
|
||||||
}))
|
|
||||||
|
|
||||||
// Simulate 1st refresh.
|
|
||||||
err := service.handleEvent(context.Background(), "", proton.Event{Addresses: []proton.AddressEvent{{}}})
|
|
||||||
require.Error(t, err)
|
|
||||||
if publisherErr := new(eventPublishError); errors.As(err, &publisherErr) {
|
|
||||||
require.Equal(t, publisherErr.subscriber, subscription)
|
|
||||||
require.True(t, errors.Is(publisherErr.error, ErrPublishTimeoutExceeded))
|
|
||||||
} else {
|
|
||||||
require.True(t, errors.Is(err, ErrPublishTimeoutExceeded))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@ -22,7 +22,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ProtonMail/gluon/async"
|
"github.com/ProtonMail/gluon/async"
|
||||||
"github.com/ProtonMail/go-proton-api"
|
"github.com/ProtonMail/go-proton-api"
|
||||||
@ -88,22 +87,19 @@ func (p publishError[T]) Error() string {
|
|||||||
return fmt.Sprintf("Event publish failed on (%v): %v", p.subscriber.name(), p.error.Error())
|
return fmt.Sprintf("Event publish failed on (%v): %v", p.subscriber.name(), p.error.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriberList[T]) Publish(ctx context.Context, event T, timeout time.Duration) error {
|
func (s *subscriberList[T]) Publish(ctx context.Context, event T) error {
|
||||||
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(timeout))
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
for _, subscriber := range s.subscribers {
|
for _, subscriber := range s.subscribers {
|
||||||
if err := subscriber.handle(ctx, event); err != nil {
|
if err := subscriber.handle(ctx, event); err != nil {
|
||||||
return &publishError[T]{
|
return &publishError[T]{
|
||||||
subscriber: subscriber,
|
subscriber: subscriber,
|
||||||
error: mapContextTimeoutError(err),
|
error: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ctx.Err(); err != nil {
|
if err := ctx.Err(); err != nil {
|
||||||
return &publishError[T]{
|
return &publishError[T]{
|
||||||
subscriber: subscriber,
|
subscriber: subscriber,
|
||||||
error: mapContextTimeoutError(err),
|
error: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -111,40 +107,28 @@ func (s *subscriberList[T]) Publish(ctx context.Context, event T, timeout time.D
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func mapContextTimeoutError(err error) error {
|
|
||||||
if errors.Is(err, context.DeadlineExceeded) {
|
|
||||||
return ErrPublishTimeoutExceeded
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *subscriberList[T]) PublishParallel(
|
func (s *subscriberList[T]) PublishParallel(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
event T,
|
event T,
|
||||||
panicHandler async.PanicHandler,
|
panicHandler async.PanicHandler,
|
||||||
timeout time.Duration,
|
|
||||||
) error {
|
) error {
|
||||||
if len(s.subscribers) <= 1 {
|
if len(s.subscribers) <= 1 {
|
||||||
return s.Publish(ctx, event, timeout)
|
return s.Publish(ctx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(timeout))
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
err := parallel.DoContext(ctx, runtime.NumCPU()/2, len(s.subscribers), func(ctx context.Context, index int) error {
|
err := parallel.DoContext(ctx, runtime.NumCPU()/2, len(s.subscribers), func(ctx context.Context, index int) error {
|
||||||
defer async.HandlePanic(panicHandler)
|
defer async.HandlePanic(panicHandler)
|
||||||
if err := s.subscribers[index].handle(ctx, event); err != nil {
|
if err := s.subscribers[index].handle(ctx, event); err != nil {
|
||||||
return &publishError[T]{
|
return &publishError[T]{
|
||||||
subscriber: s.subscribers[index],
|
subscriber: s.subscribers[index],
|
||||||
error: mapContextTimeoutError(err),
|
error: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return mapContextTimeoutError(err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
type ChanneledSubscriber[T any] struct {
|
type ChanneledSubscriber[T any] struct {
|
||||||
|
|||||||
Reference in New Issue
Block a user