diff --git a/internal/bridge/refresh_test.go b/internal/bridge/refresh_test.go index e8edcd7d..61761db7 100644 --- a/internal/bridge/refresh_test.go +++ b/internal/bridge/refresh_test.go @@ -84,6 +84,11 @@ func TestBridge_Refresh(t *testing.T) { withBridge(ctx, t, s.GetHostURL(), netCtl, locator, storeKey, func(b *bridge.Bridge, mocks *bridge.Mocks) { mocks.Reporter.EXPECT().ReportMessageWithContext(gomock.Any(), gomock.Any()).AnyTimes() + // Wait for refresh event first + refreshCh, refreshChDone := chToType[events.Event, events.UserRefreshed](b.GetEvents(events.UserRefreshed{})) + defer refreshChDone() + require.Equal(t, userID, (<-refreshCh).UserID) + // Then sync event syncCh, done := chToType[events.Event, events.SyncFinished](b.GetEvents(events.SyncFinished{})) defer done() diff --git a/internal/services/userevents/service.go b/internal/services/userevents/service.go index 45c7663d..72a7612f 100644 --- a/internal/services/userevents/service.go +++ b/internal/services/userevents/service.go @@ -52,7 +52,7 @@ type Service struct { eventIDStore EventIDStore log *logrus.Entry eventPublisher events.EventPublisher - timer *time.Ticker + timer *proton.Ticker eventTimeout time.Duration paused uint32 panicHandler async.PanicHandler @@ -74,6 +74,7 @@ func NewService( store EventIDStore, eventPublisher events.EventPublisher, pollPeriod time.Duration, + jitter time.Duration, eventTimeout time.Duration, panicHandler async.PanicHandler, ) *Service { @@ -86,7 +87,7 @@ func NewService( "user": userID, }), eventPublisher: eventPublisher, - timer: time.NewTicker(pollPeriod), + timer: proton.NewTicker(pollPeriod, jitter, panicHandler), paused: 1, eventTimeout: eventTimeout, panicHandler: panicHandler, @@ -209,7 +210,7 @@ func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGr func (s *Service) run(ctx context.Context, lastEventID string) { s.log.Infof("Starting service Last EventID=%v", lastEventID) - defer s.close() + defer s.timer.Stop() defer s.log.Info("Exiting service") defer s.Close() @@ -488,10 +489,6 @@ func (s *Service) removeSubscription(subscription Subscription) { } } -func (s *Service) close() { - s.timer.Stop() -} - type pendingOp int const ( diff --git a/internal/services/userevents/service_handle_event_error_test.go b/internal/services/userevents/service_handle_event_error_test.go index 62539416..563cefdf 100644 --- a/internal/services/userevents/service_handle_event_error_test.go +++ b/internal/services/userevents/service_handle_event_error_test.go @@ -39,7 +39,16 @@ func TestServiceHandleEventError_SubscriberEventUnwrapping(t *testing.T) { eventPublisher := mocks.NewMockEventPublisher(mockCtrl) eventIDStore := NewInMemoryEventIDStore() - service := NewService("foo", &NullEventSource{}, eventIDStore, eventPublisher, 100*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + &NullEventSource{}, + eventIDStore, + eventPublisher, + 100*time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) lastEventID := "PrevEvent" event := proton.Event{EventID: "MyEvent"} @@ -67,7 +76,16 @@ func TestServiceHandleEventError_BadEventPutsServiceOnPause(t *testing.T) { eventPublisher := mocks.NewMockEventPublisher(mockCtrl) eventIDStore := NewInMemoryEventIDStore() - service := NewService("foo", &NullEventSource{}, eventIDStore, eventPublisher, 100*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + &NullEventSource{}, + eventIDStore, + eventPublisher, + 100*time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) service.Resume() lastEventID := "PrevEvent" event := proton.Event{EventID: "MyEvent"} @@ -91,7 +109,16 @@ func TestServiceHandleEventError_BadEventFromPublishTimeout(t *testing.T) { eventPublisher := mocks.NewMockEventPublisher(mockCtrl) eventIDStore := NewInMemoryEventIDStore() - service := NewService("foo", &NullEventSource{}, eventIDStore, eventPublisher, 100*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + &NullEventSource{}, + eventIDStore, + eventPublisher, + 100*time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) lastEventID := "PrevEvent" event := proton.Event{EventID: "MyEvent"} err := ErrPublishTimeoutExceeded @@ -112,7 +139,16 @@ func TestServiceHandleEventError_NoBadEventCheck(t *testing.T) { eventPublisher := mocks.NewMockEventPublisher(mockCtrl) eventIDStore := NewInMemoryEventIDStore() - service := NewService("foo", &NullEventSource{}, eventIDStore, eventPublisher, 100*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + &NullEventSource{}, + eventIDStore, + eventPublisher, + 100*time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) lastEventID := "PrevEvent" event := proton.Event{EventID: "MyEvent"} _, _ = service.handleEventError(context.Background(), lastEventID, event, context.Canceled) @@ -128,7 +164,16 @@ func TestServiceHandleEventError_JsonUnmarshalEventProducesUncategorizedErrorEve eventPublisher := mocks.NewMockEventPublisher(mockCtrl) eventIDStore := NewInMemoryEventIDStore() - service := NewService("foo", &NullEventSource{}, eventIDStore, eventPublisher, 100*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + &NullEventSource{}, + eventIDStore, + eventPublisher, + 100*time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) lastEventID := "PrevEvent" event := proton.Event{EventID: "MyEvent"} err := &json.UnmarshalTypeError{} diff --git a/internal/services/userevents/service_handle_event_test.go b/internal/services/userevents/service_handle_event_test.go index d64d9da2..be966817 100644 --- a/internal/services/userevents/service_handle_event_test.go +++ b/internal/services/userevents/service_handle_event_test.go @@ -58,7 +58,16 @@ func TestServiceHandleEvent_CheckEventCategoriesHandledInOrder(t *testing.T) { secondRefreshHandler := NewMockRefreshSubscriber(mockCtrl) secondRefreshHandler.EXPECT().handle(gomock.Any(), gomock.Any()).After(userSpaceCall).Times(1).Return(nil) - service := NewService("foo", &NullEventSource{}, eventIDStore, eventPublisher, 100*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + &NullEventSource{}, + eventIDStore, + eventPublisher, + 100*time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) service.addSubscription(Subscription{ User: userHandler, @@ -106,7 +115,16 @@ func TestServiceHandleEvent_CheckEventFailureCausesError(t *testing.T) { messageHandler := NewMockMessageSubscriber(mockCtrl) - service := NewService("foo", &NullEventSource{}, eventIDStore, eventPublisher, 100*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + &NullEventSource{}, + eventIDStore, + eventPublisher, + 100*time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) service.addSubscription(Subscription{ Address: addressHandler, @@ -133,7 +151,16 @@ func TestServiceHandleEvent_CheckEventFailureCausesErrorParallel(t *testing.T) { addressHandler2 := NewMockAddressSubscriber(mockCtrl) addressHandler2.EXPECT().handle(gomock.Any(), gomock.Any()).MaxTimes(1).Return(nil) - service := NewService("foo", &NullEventSource{}, eventIDStore, eventPublisher, 100*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + &NullEventSource{}, + eventIDStore, + eventPublisher, + 100*time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) service.addSubscription(Subscription{ Address: addressHandler, @@ -173,7 +200,16 @@ func TestServiceHandleEvent_SubscriberTimeout(t *testing.T) { } }).MaxTimes(1) - service := NewService("foo", &NullEventSource{}, eventIDStore, eventPublisher, 100*time.Millisecond, 500*time.Millisecond, async.NoopPanicHandler{}) + service := NewService( + "foo", + &NullEventSource{}, + eventIDStore, + eventPublisher, + 100*time.Millisecond, + time.Millisecond, + 500*time.Millisecond, + async.NoopPanicHandler{}, + ) service.addSubscription(Subscription{ Address: addressHandler, diff --git a/internal/services/userevents/service_test.go b/internal/services/userevents/service_test.go index f40316f2..f9702714 100644 --- a/internal/services/userevents/service_test.go +++ b/internal/services/userevents/service_test.go @@ -66,7 +66,16 @@ func TestService_EventIDLoadStore(t *testing.T) { eventSource.EXPECT().GetLatestEventID(gomock.Any()).Times(1).Return(firstEventID, nil) eventSource.EXPECT().GetEvent(gomock.Any(), gomock.Eq(firstEventID)).MinTimes(1).Return(secondEvent, false, nil) - service := NewService("foo", eventSource, eventIDStore, eventPublisher, 1*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + eventSource, + eventIDStore, + eventPublisher, + time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) require.NoError(t, service.Start(context.Background(), group)) service.Resume() group.Wait() @@ -110,7 +119,16 @@ func TestService_RetryEventOnNonCatastrophicFailure(t *testing.T) { subscriber.EXPECT().handle(gomock.Any(), gomock.Eq(messageEvents)).After(firstCall).Times(1).Return(nil) } - service := NewService("foo", eventSource, eventIDStore, eventPublisher, 1*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + eventSource, + eventIDStore, + eventPublisher, + time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) service.Subscribe(Subscription{Messages: subscriber}) require.NoError(t, service.Start(context.Background(), group)) @@ -149,7 +167,16 @@ func TestService_OnBadEventServiceIsPaused(t *testing.T) { subscriber.EXPECT().name().AnyTimes().Return("Foo") subscriber.EXPECT().handle(gomock.Any(), gomock.Eq(messageEvents)).Times(1).Return(badEventErr) - service := NewService("foo", eventSource, eventIDStore, eventPublisher, 1*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + eventSource, + eventIDStore, + eventPublisher, + time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) // Event publisher expectations. eventPublisher.EXPECT().PublishEvent(gomock.Any(), events.UserBadEvent{ @@ -203,7 +230,16 @@ func TestService_UnsubscribeDuringEventHandlingDoesNotCauseDeadlock(t *testing.T // Event Source expectations. eventSource.EXPECT().GetEvent(gomock.Any(), gomock.Eq(firstEventID)).MinTimes(1).Return(secondEvent, false, nil) - service := NewService("foo", eventSource, eventIDStore, eventPublisher, 1*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + eventSource, + eventIDStore, + eventPublisher, + time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) // Subscriber expectations. subscriber.EXPECT().name().AnyTimes().Return("Foo") @@ -252,7 +288,16 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T eventSource.EXPECT().GetEvent(gomock.Any(), gomock.Eq(firstEventID)).MinTimes(1).Return(secondEvent, false, nil) eventSource.EXPECT().GetEvent(gomock.Any(), gomock.Eq(secondEventID)).AnyTimes().Return(secondEvent, false, nil) - service := NewService("foo", eventSource, eventIDStore, eventPublisher, 1*time.Millisecond, time.Second, async.NoopPanicHandler{}) + service := NewService( + "foo", + eventSource, + eventIDStore, + eventPublisher, + time.Millisecond, + time.Millisecond, + time.Second, + async.NoopPanicHandler{}, + ) // start subscriber group.Go(context.Background(), "", "", func(_ context.Context) { diff --git a/internal/user/user.go b/internal/user/user.go index 8e68057f..2d6f9d02 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -206,6 +206,7 @@ func newImpl( userevents.NewVaultEventIDStore(encVault), user, EventPeriod, + EventJitter, 5*time.Minute, crashHandler, )