GODT-2014: bridge quit if gRPC client ends stream (v3)

This commit is contained in:
James Houlahan
2022-11-04 01:18:41 +01:00
parent 8b5cb7729c
commit 8bb60afabd
3 changed files with 50 additions and 15 deletions

View File

@ -29,19 +29,19 @@ import (
func (s *Service) RunEventStream(request *EventStreamRequest, server Bridge_RunEventStreamServer) error {
s.log.Debug("Starting Event stream")
if s.eventStreamCh != nil {
if s.isStreamingEvents() {
return status.Errorf(codes.AlreadyExists, "the service is already streaming") // TO-DO GODT-1667 decide if we want to kill the existing stream.
}
s.bridge.SetCurrentPlatform(request.ClientPlatform)
s.eventStreamCh = make(chan *StreamEvent)
s.createEventStreamChannel()
s.eventStreamDoneCh = make(chan struct{})
// TO-DO GODT-1667 We should have a safer we to close this channel? What if an event occur while we are closing?
defer func() {
close(s.eventStreamCh)
s.eventStreamCh = nil
s.deleteEventStreamChannel()
close(s.eventStreamDoneCh)
s.eventStreamDoneCh = nil
}()
@ -70,24 +70,34 @@ func (s *Service) RunEventStream(request *EventStreamRequest, server Bridge_RunE
s.log.Debug("Stop Event stream")
return err
}
case <-server.Context().Done():
s.log.Debug("Client closed the stream, exiting")
return s.quit()
}
}
}
// StopEventStream stops the event stream.
func (s *Service) StopEventStream(_ context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
return &emptypb.Empty{}, s.stopEventStream()
}
func (s *Service) stopEventStream() error {
s.eventStreamChMutex.RLock()
defer s.eventStreamChMutex.RUnlock()
if s.eventStreamCh == nil {
return nil, status.Errorf(codes.NotFound, "The service is not streaming")
return status.Errorf(codes.NotFound, "The service is not streaming")
}
s.eventStreamDoneCh <- struct{}{}
return &emptypb.Empty{}, nil
return nil
}
// SendEvent sends an event to the via the gRPC event stream.
func (s *Service) SendEvent(event *StreamEvent) error {
if s.eventStreamCh == nil { // nobody is connected to the event stream, we queue events
if !s.isStreamingEvents() { // nobody is connected to the event stream, we queue events
s.queueEvent(event)
return nil
}
@ -173,3 +183,24 @@ func (s *Service) queueEvent(event *StreamEvent) {
s.eventQueue = append(s.eventQueue, event)
}
}
func (s *Service) isStreamingEvents() bool {
s.eventStreamChMutex.RLock()
defer s.eventStreamChMutex.RUnlock()
return s.eventStreamCh != nil
}
func (s *Service) createEventStreamChannel() {
s.eventStreamChMutex.Lock()
defer s.eventStreamChMutex.Unlock()
s.eventStreamCh = make(chan *StreamEvent)
}
func (s *Service) deleteEventStreamChannel() {
s.eventStreamChMutex.Lock()
defer s.eventStreamChMutex.Unlock()
s.eventStreamCh = nil
}