mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-16 15:16:44 +00:00
Revert "GODT-2014: bridge quit if gRPC client ends stream."
This reverts commit 8ca849b7a8.
This commit is contained in:
@ -59,13 +59,12 @@ const (
|
||||
// Service is the RPC service struct.
|
||||
type Service struct { // nolint:structcheck
|
||||
UnimplementedBridgeServer
|
||||
grpcServer *grpc.Server // the gGRPC server
|
||||
listener net.Listener
|
||||
eventStreamCh chan *StreamEvent
|
||||
eventStreamChMutex sync.RWMutex
|
||||
eventStreamDoneCh chan struct{}
|
||||
eventQueue []*StreamEvent
|
||||
eventQueueMutex sync.Mutex
|
||||
grpcServer *grpc.Server // the gGRPC server
|
||||
listener net.Listener
|
||||
eventStreamCh chan *StreamEvent
|
||||
eventStreamDoneCh chan struct{}
|
||||
eventQueue []*StreamEvent
|
||||
eventQueueMutex sync.Mutex
|
||||
|
||||
panicHandler types.PanicHandler
|
||||
eventListener listener.Listener
|
||||
|
||||
@ -95,15 +95,12 @@ func (s *Service) GuiReady(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empt
|
||||
// Quit implement the Quit gRPC service call.
|
||||
func (s *Service) Quit(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
|
||||
s.log.Debug("Quit")
|
||||
return &emptypb.Empty{}, s.quit()
|
||||
}
|
||||
|
||||
func (s *Service) quit() error {
|
||||
// Windows is notably slow at Quitting. We do it in a goroutine to speed things up a bit.
|
||||
go func() {
|
||||
var err error
|
||||
if s.isStreamingEvents() {
|
||||
if err = s.stopEventStream(); err != nil {
|
||||
if s.eventStreamCh != nil {
|
||||
if _, err = s.StopEventStream(ctx, empty); err != nil {
|
||||
s.log.WithError(err).Error("Quit failed.")
|
||||
}
|
||||
}
|
||||
@ -112,7 +109,7 @@ func (s *Service) quit() error {
|
||||
s.grpcServer.GracefulStop()
|
||||
}()
|
||||
|
||||
return nil
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
// Restart implement the Restart gRPC service call.
|
||||
|
||||
@ -29,19 +29,19 @@ import (
|
||||
func (s *Service) RunEventStream(request *EventStreamRequest, server Bridge_RunEventStreamServer) error {
|
||||
s.log.Debug("Starting Event stream")
|
||||
|
||||
if s.isStreamingEvents() {
|
||||
if s.eventStreamCh != nil {
|
||||
return status.Errorf(codes.AlreadyExists, "the service is already streaming") // TO-DO GODT-1667 decide if we want to kill the existing stream.
|
||||
}
|
||||
|
||||
s.bridge.SetCurrentPlatform(request.ClientPlatform)
|
||||
|
||||
s.createEventStreamChannel()
|
||||
s.eventStreamCh = make(chan *StreamEvent)
|
||||
s.eventStreamDoneCh = make(chan struct{})
|
||||
|
||||
// TO-DO GODT-1667 We should have a safer we to close this channel? What if an event occur while we are closing?
|
||||
defer func() {
|
||||
close(s.eventStreamCh)
|
||||
s.deleteEventStreamChannel()
|
||||
s.eventStreamCh = nil
|
||||
close(s.eventStreamDoneCh)
|
||||
s.eventStreamDoneCh = nil
|
||||
}()
|
||||
@ -70,34 +70,24 @@ func (s *Service) RunEventStream(request *EventStreamRequest, server Bridge_RunE
|
||||
s.log.Debug("Stop Event stream")
|
||||
return err
|
||||
}
|
||||
case <-server.Context().Done():
|
||||
s.log.Debug("Client closed the stream, exiting")
|
||||
return s.quit()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StopEventStream stops the event stream.
|
||||
func (s *Service) StopEventStream(_ context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
|
||||
return &emptypb.Empty{}, s.stopEventStream()
|
||||
}
|
||||
|
||||
func (s *Service) stopEventStream() error {
|
||||
s.eventStreamChMutex.RLock()
|
||||
defer s.eventStreamChMutex.RUnlock()
|
||||
|
||||
func (s *Service) StopEventStream(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
|
||||
if s.eventStreamCh == nil {
|
||||
return status.Errorf(codes.NotFound, "The service is not streaming")
|
||||
return nil, status.Errorf(codes.NotFound, "The service is not streaming")
|
||||
}
|
||||
|
||||
s.eventStreamDoneCh <- struct{}{}
|
||||
|
||||
return nil
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
// SendEvent sends an event to the via the gRPC event stream.
|
||||
func (s *Service) SendEvent(event *StreamEvent) error {
|
||||
if !s.isStreamingEvents() { // nobody is connected to the event stream, we queue events
|
||||
if s.eventStreamCh == nil { // nobody is connected to the event stream, we queue events
|
||||
s.queueEvent(event)
|
||||
return nil
|
||||
}
|
||||
@ -184,24 +174,3 @@ func (s *Service) queueEvent(event *StreamEvent) {
|
||||
s.eventQueue = append(s.eventQueue, event)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) isStreamingEvents() bool {
|
||||
s.eventStreamChMutex.RLock()
|
||||
defer s.eventStreamChMutex.RUnlock()
|
||||
|
||||
return s.eventStreamCh != nil
|
||||
}
|
||||
|
||||
func (s *Service) createEventStreamChannel() {
|
||||
s.eventStreamChMutex.Lock()
|
||||
defer s.eventStreamChMutex.Unlock()
|
||||
|
||||
s.eventStreamCh = make(chan *StreamEvent)
|
||||
}
|
||||
|
||||
func (s *Service) deleteEventStreamChannel() {
|
||||
s.eventStreamChMutex.Lock()
|
||||
defer s.eventStreamChMutex.Unlock()
|
||||
|
||||
s.eventStreamCh = nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user