diff --git a/internal/services/userevents/service_test.go b/internal/services/userevents/service_test.go index fc4afc25..c858321e 100644 --- a/internal/services/userevents/service_test.go +++ b/internal/services/userevents/service_test.go @@ -226,7 +226,7 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T eventPublisher := mocks2.NewMockEventPublisher(mockCtrl) eventIDStore := mocks.NewMockEventIDStore(mockCtrl) eventSource := mocks.NewMockEventSource(mockCtrl) - subscriber := newChanneledSubscriber[proton.MessageEvent]("My subscriber") + subscriber := NewMessageSubscriber("My subscriber") firstEventID := "EVENT01" secondEventID := "EVENT02" diff --git a/internal/services/userevents/subscriber.go b/internal/services/userevents/subscriber.go index 16e92f92..2dc3aea5 100644 --- a/internal/services/userevents/subscriber.go +++ b/internal/services/userevents/subscriber.go @@ -31,27 +31,34 @@ import ( "golang.org/x/exp/slices" ) -func NewMessageSubscriber(name string) *ChanneledSubscriber[proton.MessageEvent] { - return newChanneledSubscriber[proton.MessageEvent](name) +type AddressChanneledSubscriber = ChanneledSubscriber[[]proton.AddressEvent] +type LabelChanneledSubscriber = ChanneledSubscriber[[]proton.LabelEvent] +type MessageChanneledSubscriber = ChanneledSubscriber[[]proton.MessageEvent] +type UserChanneledSubscriber = ChanneledSubscriber[proton.User] +type RefreshChanneledSubscriber = ChanneledSubscriber[proton.RefreshFlag] +type UserUsedSpaceChanneledSubscriber = ChanneledSubscriber[int] + +func NewMessageSubscriber(name string) *MessageChanneledSubscriber { + return newChanneledSubscriber[[]proton.MessageEvent](name) } -func NewAddressSubscriber(name string) *ChanneledSubscriber[proton.AddressEvent] { - return newChanneledSubscriber[proton.AddressEvent](name) +func NewAddressSubscriber(name string) *AddressChanneledSubscriber { + return newChanneledSubscriber[[]proton.AddressEvent](name) } -func NewLabelSubscriber(name string) *ChanneledSubscriber[proton.LabelEvent] { - return newChanneledSubscriber[proton.LabelEvent](name) +func NewLabelSubscriber(name string) *LabelChanneledSubscriber { + return newChanneledSubscriber[[]proton.LabelEvent](name) } -func NewRefreshSubscriber(name string) *ChanneledSubscriber[struct{}] { - return newChanneledSubscriber[struct{}](name) +func NewRefreshSubscriber(name string) *RefreshChanneledSubscriber { + return newChanneledSubscriber[proton.RefreshFlag](name) } -func NewUserSubscriber(name string) *ChanneledSubscriber[proton.User] { +func NewUserSubscriber(name string) *UserChanneledSubscriber { return newChanneledSubscriber[proton.User](name) } -func NewUserUsedSpaceSubscriber(name string) *ChanneledSubscriber[int] { +func NewUserUsedSpaceSubscriber(name string) *UserUsedSpaceChanneledSubscriber { return newChanneledSubscriber[int](name) } @@ -193,11 +200,11 @@ func newChanneledSubscriber[T any](name string) *ChanneledSubscriber[T] { } type ChanneledSubscriberEvent[T any] struct { - data []T + data T response chan error } -func (c ChanneledSubscriberEvent[T]) Consume(f func([]T) error) { +func (c ChanneledSubscriberEvent[T]) Consume(f func(T) error) { if err := f(c.data); err != nil { c.response <- err } @@ -208,7 +215,7 @@ func (c *ChanneledSubscriber[T]) name() string { //nolint:unused return c.id } -func (c *ChanneledSubscriber[T]) handle(ctx context.Context, event []T) error { //nolint:unused +func (c *ChanneledSubscriber[T]) handle(ctx context.Context, event T) error { //nolint:unused data := &ChanneledSubscriberEvent[T]{ data: event, response: make(chan error), @@ -246,7 +253,7 @@ func (c *ChanneledSubscriber[T]) cancel() { //nolint:unused return } - e.Consume(func(_ []T) error { return nil }) + e.Consume(func(_ T) error { return nil }) } }() } diff --git a/internal/services/userevents/subscriber_test.go b/internal/services/userevents/subscriber_test.go index 2bb0b806..97b4be1e 100644 --- a/internal/services/userevents/subscriber_test.go +++ b/internal/services/userevents/subscriber_test.go @@ -39,13 +39,13 @@ func TestChanneledSubscriber_CtxTimeoutDoesNotBlockFutureEvents(t *testing.T) { defer wg.Done() // Send one event, that succeeds. - require.NoError(t, subscriber.handle(context.Background(), []int{30})) + require.NoError(t, subscriber.handle(context.Background(), 30)) // Add an impossible deadline that fails immediately to simulate on event taking too long to send. ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Microsecond)) defer cancel() - err := subscriber.handle(ctx, []int{20}) + err := subscriber.handle(ctx, 20) require.Error(t, err) require.True(t, errors.Is(err, context.DeadlineExceeded)) }() @@ -53,8 +53,8 @@ func TestChanneledSubscriber_CtxTimeoutDoesNotBlockFutureEvents(t *testing.T) { // Receive first event. Notify success. event, ok := <-subscriber.OnEventCh() require.True(t, ok) - event.Consume(func(event []int) error { - require.Equal(t, []int{30}, event) + event.Consume(func(event int) error { + require.Equal(t, 30, event) return nil }) wg.Wait() @@ -63,13 +63,13 @@ func TestChanneledSubscriber_CtxTimeoutDoesNotBlockFutureEvents(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - require.NoError(t, subscriber.handle(context.Background(), []int{40})) + require.NoError(t, subscriber.handle(context.Background(), 40)) }() event, ok = <-subscriber.OnEventCh() require.True(t, ok) - event.Consume(func(event []int) error { - require.Equal(t, []int{40}, event) + event.Consume(func(event int) error { + require.Equal(t, 40, event) return nil }) @@ -88,7 +88,7 @@ func TestChanneledSubscriber_ErrorReported(t *testing.T) { defer wg.Done() // Send one event, that succeeds. - err := subscriber.handle(context.Background(), []int{30}) + err := subscriber.handle(context.Background(), 30) require.Error(t, err) require.Equal(t, reportedErr, err) }() @@ -96,8 +96,8 @@ func TestChanneledSubscriber_ErrorReported(t *testing.T) { // Receive first event. Notify success. event, ok := <-subscriber.OnEventCh() require.True(t, ok) - event.Consume(func(event []int) error { - require.Equal(t, []int{30}, event) + event.Consume(func(event int) error { + require.Equal(t, 30, event) return reportedErr })