diff --git a/internal/frontend/grpc/service.go b/internal/frontend/grpc/service.go index ed5c918d..00d7aebc 100644 --- a/internal/frontend/grpc/service.go +++ b/internal/frontend/grpc/service.go @@ -59,13 +59,12 @@ const ( // Service is the RPC service struct. type Service struct { // nolint:structcheck UnimplementedBridgeServer - grpcServer *grpc.Server // the gGRPC server - listener net.Listener - eventStreamCh chan *StreamEvent - eventStreamChMutex sync.RWMutex - eventStreamDoneCh chan struct{} - eventQueue []*StreamEvent - eventQueueMutex sync.Mutex + grpcServer *grpc.Server // the gGRPC server + listener net.Listener + eventStreamCh chan *StreamEvent + eventStreamDoneCh chan struct{} + eventQueue []*StreamEvent + eventQueueMutex sync.Mutex panicHandler types.PanicHandler eventListener listener.Listener diff --git a/internal/frontend/grpc/service_methods.go b/internal/frontend/grpc/service_methods.go index 283537e9..fde708a1 100644 --- a/internal/frontend/grpc/service_methods.go +++ b/internal/frontend/grpc/service_methods.go @@ -95,15 +95,12 @@ func (s *Service) GuiReady(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empt // Quit implement the Quit gRPC service call. func (s *Service) Quit(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) { s.log.Debug("Quit") - return &emptypb.Empty{}, s.quit() -} -func (s *Service) quit() error { // Windows is notably slow at Quitting. We do it in a goroutine to speed things up a bit. go func() { var err error - if s.isStreamingEvents() { - if err = s.stopEventStream(); err != nil { + if s.eventStreamCh != nil { + if _, err = s.StopEventStream(ctx, empty); err != nil { s.log.WithError(err).Error("Quit failed.") } } @@ -112,7 +109,7 @@ func (s *Service) quit() error { s.grpcServer.GracefulStop() }() - return nil + return &emptypb.Empty{}, nil } // Restart implement the Restart gRPC service call. diff --git a/internal/frontend/grpc/service_stream.go b/internal/frontend/grpc/service_stream.go index 4d079619..f32511fd 100644 --- a/internal/frontend/grpc/service_stream.go +++ b/internal/frontend/grpc/service_stream.go @@ -29,19 +29,19 @@ import ( func (s *Service) RunEventStream(request *EventStreamRequest, server Bridge_RunEventStreamServer) error { s.log.Debug("Starting Event stream") - if s.isStreamingEvents() { + if s.eventStreamCh != nil { return status.Errorf(codes.AlreadyExists, "the service is already streaming") // TO-DO GODT-1667 decide if we want to kill the existing stream. } s.bridge.SetCurrentPlatform(request.ClientPlatform) - s.createEventStreamChannel() + s.eventStreamCh = make(chan *StreamEvent) s.eventStreamDoneCh = make(chan struct{}) // TO-DO GODT-1667 We should have a safer we to close this channel? What if an event occur while we are closing? defer func() { close(s.eventStreamCh) - s.deleteEventStreamChannel() + s.eventStreamCh = nil close(s.eventStreamDoneCh) s.eventStreamDoneCh = nil }() @@ -70,34 +70,24 @@ func (s *Service) RunEventStream(request *EventStreamRequest, server Bridge_RunE s.log.Debug("Stop Event stream") return err } - case <-server.Context().Done(): - s.log.Debug("Client closed the stream, exiting") - return s.quit() } } } // StopEventStream stops the event stream. -func (s *Service) StopEventStream(_ context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) { - return &emptypb.Empty{}, s.stopEventStream() -} - -func (s *Service) stopEventStream() error { - s.eventStreamChMutex.RLock() - defer s.eventStreamChMutex.RUnlock() - +func (s *Service) StopEventStream(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) { if s.eventStreamCh == nil { - return status.Errorf(codes.NotFound, "The service is not streaming") + return nil, status.Errorf(codes.NotFound, "The service is not streaming") } s.eventStreamDoneCh <- struct{}{} - return nil + return &emptypb.Empty{}, nil } // SendEvent sends an event to the via the gRPC event stream. func (s *Service) SendEvent(event *StreamEvent) error { - if !s.isStreamingEvents() { // nobody is connected to the event stream, we queue events + if s.eventStreamCh == nil { // nobody is connected to the event stream, we queue events s.queueEvent(event) return nil } @@ -184,24 +174,3 @@ func (s *Service) queueEvent(event *StreamEvent) { s.eventQueue = append(s.eventQueue, event) } } - -func (s *Service) isStreamingEvents() bool { - s.eventStreamChMutex.RLock() - defer s.eventStreamChMutex.RUnlock() - - return s.eventStreamCh != nil -} - -func (s *Service) createEventStreamChannel() { - s.eventStreamChMutex.Lock() - defer s.eventStreamChMutex.Unlock() - - s.eventStreamCh = make(chan *StreamEvent) -} - -func (s *Service) deleteEventStreamChannel() { - s.eventStreamChMutex.Lock() - defer s.eventStreamChMutex.Unlock() - - s.eventStreamCh = nil -}