diff --git a/internal/frontend/grpc/service.go b/internal/frontend/grpc/service.go index e082b6c1..5f53ef15 100644 --- a/internal/frontend/grpc/service.go +++ b/internal/frontend/grpc/service.go @@ -59,12 +59,13 @@ const ( // Service is the RPC service struct. type Service struct { // nolint:structcheck UnimplementedBridgeServer - grpcServer *grpc.Server // the gGRPC server - listener net.Listener - eventStreamCh chan *StreamEvent - eventStreamDoneCh chan struct{} - eventQueue []*StreamEvent - eventQueueMutex sync.Mutex + grpcServer *grpc.Server // the gGRPC server + listener net.Listener + eventStreamCh chan *StreamEvent + eventStreamChMutex sync.RWMutex + eventStreamDoneCh chan struct{} + eventQueue []*StreamEvent + eventQueueMutex sync.Mutex panicHandler types.PanicHandler eventListener listener.Listener diff --git a/internal/frontend/grpc/service_methods.go b/internal/frontend/grpc/service_methods.go index fde708a1..283537e9 100644 --- a/internal/frontend/grpc/service_methods.go +++ b/internal/frontend/grpc/service_methods.go @@ -95,12 +95,15 @@ func (s *Service) GuiReady(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empt // Quit implement the Quit gRPC service call. func (s *Service) Quit(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) { s.log.Debug("Quit") + return &emptypb.Empty{}, s.quit() +} +func (s *Service) quit() error { // Windows is notably slow at Quitting. We do it in a goroutine to speed things up a bit. go func() { var err error - if s.eventStreamCh != nil { - if _, err = s.StopEventStream(ctx, empty); err != nil { + if s.isStreamingEvents() { + if err = s.stopEventStream(); err != nil { s.log.WithError(err).Error("Quit failed.") } } @@ -109,7 +112,7 @@ func (s *Service) Quit(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empt s.grpcServer.GracefulStop() }() - return &emptypb.Empty{}, nil + return nil } // Restart implement the Restart gRPC service call. diff --git a/internal/frontend/grpc/service_stream.go b/internal/frontend/grpc/service_stream.go index f32511fd..4d079619 100644 --- a/internal/frontend/grpc/service_stream.go +++ b/internal/frontend/grpc/service_stream.go @@ -29,19 +29,19 @@ import ( func (s *Service) RunEventStream(request *EventStreamRequest, server Bridge_RunEventStreamServer) error { s.log.Debug("Starting Event stream") - if s.eventStreamCh != nil { + if s.isStreamingEvents() { return status.Errorf(codes.AlreadyExists, "the service is already streaming") // TO-DO GODT-1667 decide if we want to kill the existing stream. } s.bridge.SetCurrentPlatform(request.ClientPlatform) - s.eventStreamCh = make(chan *StreamEvent) + s.createEventStreamChannel() s.eventStreamDoneCh = make(chan struct{}) // TO-DO GODT-1667 We should have a safer we to close this channel? What if an event occur while we are closing? defer func() { close(s.eventStreamCh) - s.eventStreamCh = nil + s.deleteEventStreamChannel() close(s.eventStreamDoneCh) s.eventStreamDoneCh = nil }() @@ -70,24 +70,34 @@ func (s *Service) RunEventStream(request *EventStreamRequest, server Bridge_RunE s.log.Debug("Stop Event stream") return err } + case <-server.Context().Done(): + s.log.Debug("Client closed the stream, exiting") + return s.quit() } } } // StopEventStream stops the event stream. -func (s *Service) StopEventStream(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) { +func (s *Service) StopEventStream(_ context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) { + return &emptypb.Empty{}, s.stopEventStream() +} + +func (s *Service) stopEventStream() error { + s.eventStreamChMutex.RLock() + defer s.eventStreamChMutex.RUnlock() + if s.eventStreamCh == nil { - return nil, status.Errorf(codes.NotFound, "The service is not streaming") + return status.Errorf(codes.NotFound, "The service is not streaming") } s.eventStreamDoneCh <- struct{}{} - return &emptypb.Empty{}, nil + return nil } // SendEvent sends an event to the via the gRPC event stream. func (s *Service) SendEvent(event *StreamEvent) error { - if s.eventStreamCh == nil { // nobody is connected to the event stream, we queue events + if !s.isStreamingEvents() { // nobody is connected to the event stream, we queue events s.queueEvent(event) return nil } @@ -174,3 +184,24 @@ func (s *Service) queueEvent(event *StreamEvent) { s.eventQueue = append(s.eventQueue, event) } } + +func (s *Service) isStreamingEvents() bool { + s.eventStreamChMutex.RLock() + defer s.eventStreamChMutex.RUnlock() + + return s.eventStreamCh != nil +} + +func (s *Service) createEventStreamChannel() { + s.eventStreamChMutex.Lock() + defer s.eventStreamChMutex.Unlock() + + s.eventStreamCh = make(chan *StreamEvent) +} + +func (s *Service) deleteEventStreamChannel() { + s.eventStreamChMutex.Lock() + defer s.eventStreamChMutex.Unlock() + + s.eventStreamCh = nil +}