forked from Silverfish/proton-bridge
fix(GODT-2800): ChanneledSubcriber types
This commit is contained in:
@ -226,7 +226,7 @@ func TestService_UnsubscribeBeforeHandlingEventIsNotConsideredError(t *testing.T
|
|||||||
eventPublisher := mocks2.NewMockEventPublisher(mockCtrl)
|
eventPublisher := mocks2.NewMockEventPublisher(mockCtrl)
|
||||||
eventIDStore := mocks.NewMockEventIDStore(mockCtrl)
|
eventIDStore := mocks.NewMockEventIDStore(mockCtrl)
|
||||||
eventSource := mocks.NewMockEventSource(mockCtrl)
|
eventSource := mocks.NewMockEventSource(mockCtrl)
|
||||||
subscriber := newChanneledSubscriber[proton.MessageEvent]("My subscriber")
|
subscriber := NewMessageSubscriber("My subscriber")
|
||||||
|
|
||||||
firstEventID := "EVENT01"
|
firstEventID := "EVENT01"
|
||||||
secondEventID := "EVENT02"
|
secondEventID := "EVENT02"
|
||||||
|
|||||||
@ -31,27 +31,34 @@ import (
|
|||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewMessageSubscriber(name string) *ChanneledSubscriber[proton.MessageEvent] {
|
type AddressChanneledSubscriber = ChanneledSubscriber[[]proton.AddressEvent]
|
||||||
return newChanneledSubscriber[proton.MessageEvent](name)
|
type LabelChanneledSubscriber = ChanneledSubscriber[[]proton.LabelEvent]
|
||||||
|
type MessageChanneledSubscriber = ChanneledSubscriber[[]proton.MessageEvent]
|
||||||
|
type UserChanneledSubscriber = ChanneledSubscriber[proton.User]
|
||||||
|
type RefreshChanneledSubscriber = ChanneledSubscriber[proton.RefreshFlag]
|
||||||
|
type UserUsedSpaceChanneledSubscriber = ChanneledSubscriber[int]
|
||||||
|
|
||||||
|
func NewMessageSubscriber(name string) *MessageChanneledSubscriber {
|
||||||
|
return newChanneledSubscriber[[]proton.MessageEvent](name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAddressSubscriber(name string) *ChanneledSubscriber[proton.AddressEvent] {
|
func NewAddressSubscriber(name string) *AddressChanneledSubscriber {
|
||||||
return newChanneledSubscriber[proton.AddressEvent](name)
|
return newChanneledSubscriber[[]proton.AddressEvent](name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLabelSubscriber(name string) *ChanneledSubscriber[proton.LabelEvent] {
|
func NewLabelSubscriber(name string) *LabelChanneledSubscriber {
|
||||||
return newChanneledSubscriber[proton.LabelEvent](name)
|
return newChanneledSubscriber[[]proton.LabelEvent](name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRefreshSubscriber(name string) *ChanneledSubscriber[struct{}] {
|
func NewRefreshSubscriber(name string) *RefreshChanneledSubscriber {
|
||||||
return newChanneledSubscriber[struct{}](name)
|
return newChanneledSubscriber[proton.RefreshFlag](name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUserSubscriber(name string) *ChanneledSubscriber[proton.User] {
|
func NewUserSubscriber(name string) *UserChanneledSubscriber {
|
||||||
return newChanneledSubscriber[proton.User](name)
|
return newChanneledSubscriber[proton.User](name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUserUsedSpaceSubscriber(name string) *ChanneledSubscriber[int] {
|
func NewUserUsedSpaceSubscriber(name string) *UserUsedSpaceChanneledSubscriber {
|
||||||
return newChanneledSubscriber[int](name)
|
return newChanneledSubscriber[int](name)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -193,11 +200,11 @@ func newChanneledSubscriber[T any](name string) *ChanneledSubscriber[T] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type ChanneledSubscriberEvent[T any] struct {
|
type ChanneledSubscriberEvent[T any] struct {
|
||||||
data []T
|
data T
|
||||||
response chan error
|
response chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c ChanneledSubscriberEvent[T]) Consume(f func([]T) error) {
|
func (c ChanneledSubscriberEvent[T]) Consume(f func(T) error) {
|
||||||
if err := f(c.data); err != nil {
|
if err := f(c.data); err != nil {
|
||||||
c.response <- err
|
c.response <- err
|
||||||
}
|
}
|
||||||
@ -208,7 +215,7 @@ func (c *ChanneledSubscriber[T]) name() string { //nolint:unused
|
|||||||
return c.id
|
return c.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ChanneledSubscriber[T]) handle(ctx context.Context, event []T) error { //nolint:unused
|
func (c *ChanneledSubscriber[T]) handle(ctx context.Context, event T) error { //nolint:unused
|
||||||
data := &ChanneledSubscriberEvent[T]{
|
data := &ChanneledSubscriberEvent[T]{
|
||||||
data: event,
|
data: event,
|
||||||
response: make(chan error),
|
response: make(chan error),
|
||||||
@ -246,7 +253,7 @@ func (c *ChanneledSubscriber[T]) cancel() { //nolint:unused
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
e.Consume(func(_ []T) error { return nil })
|
e.Consume(func(_ T) error { return nil })
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -39,13 +39,13 @@ func TestChanneledSubscriber_CtxTimeoutDoesNotBlockFutureEvents(t *testing.T) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
// Send one event, that succeeds.
|
// Send one event, that succeeds.
|
||||||
require.NoError(t, subscriber.handle(context.Background(), []int{30}))
|
require.NoError(t, subscriber.handle(context.Background(), 30))
|
||||||
|
|
||||||
// Add an impossible deadline that fails immediately to simulate on event taking too long to send.
|
// Add an impossible deadline that fails immediately to simulate on event taking too long to send.
|
||||||
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Microsecond))
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Microsecond))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
err := subscriber.handle(ctx, []int{20})
|
err := subscriber.handle(ctx, 20)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.True(t, errors.Is(err, context.DeadlineExceeded))
|
require.True(t, errors.Is(err, context.DeadlineExceeded))
|
||||||
}()
|
}()
|
||||||
@ -53,8 +53,8 @@ func TestChanneledSubscriber_CtxTimeoutDoesNotBlockFutureEvents(t *testing.T) {
|
|||||||
// Receive first event. Notify success.
|
// Receive first event. Notify success.
|
||||||
event, ok := <-subscriber.OnEventCh()
|
event, ok := <-subscriber.OnEventCh()
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
event.Consume(func(event []int) error {
|
event.Consume(func(event int) error {
|
||||||
require.Equal(t, []int{30}, event)
|
require.Equal(t, 30, event)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -63,13 +63,13 @@ func TestChanneledSubscriber_CtxTimeoutDoesNotBlockFutureEvents(t *testing.T) {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
require.NoError(t, subscriber.handle(context.Background(), []int{40}))
|
require.NoError(t, subscriber.handle(context.Background(), 40))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
event, ok = <-subscriber.OnEventCh()
|
event, ok = <-subscriber.OnEventCh()
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
event.Consume(func(event []int) error {
|
event.Consume(func(event int) error {
|
||||||
require.Equal(t, []int{40}, event)
|
require.Equal(t, 40, event)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -88,7 +88,7 @@ func TestChanneledSubscriber_ErrorReported(t *testing.T) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
// Send one event, that succeeds.
|
// Send one event, that succeeds.
|
||||||
err := subscriber.handle(context.Background(), []int{30})
|
err := subscriber.handle(context.Background(), 30)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, reportedErr, err)
|
require.Equal(t, reportedErr, err)
|
||||||
}()
|
}()
|
||||||
@ -96,8 +96,8 @@ func TestChanneledSubscriber_ErrorReported(t *testing.T) {
|
|||||||
// Receive first event. Notify success.
|
// Receive first event. Notify success.
|
||||||
event, ok := <-subscriber.OnEventCh()
|
event, ok := <-subscriber.OnEventCh()
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
event.Consume(func(event []int) error {
|
event.Consume(func(event int) error {
|
||||||
require.Equal(t, []int{30}, event)
|
require.Equal(t, 30, event)
|
||||||
return reportedErr
|
return reportedErr
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user