GODT-1932: frontend is instantiated before bridge.

WIP: introduced frontend.Type.
WIP: frontend is create before bridge is instantiated.
WIP: filtering of internet stastus event in gRPC event queue.
This commit is contained in:
Xavier Michelon
2022-10-10 17:32:08 +02:00
parent 9a3900114b
commit db2379e2fd
7 changed files with 140 additions and 61 deletions

View File

@ -0,0 +1,32 @@
// Copyright (c) 2022 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package grpc
import "github.com/bradenaw/juniper/xslices"
// isInternetStatus returns true iff the event is InternetStatus.
func (x *StreamEvent) isInternetStatus() bool {
appEvent := x.GetApp()
return (appEvent != nil) && (appEvent.GetInternetStatus() != nil)
}
// filterOutInternetStatusEvents return a copy of the events list where all internet connection events have been removed.
func filterOutInternetStatusEvents(events []*StreamEvent) []*StreamEvent {
return xslices.Filter(events, func(event *StreamEvent) bool { return !event.isInternetStatus() })
}

View File

@ -53,3 +53,25 @@ func TestConfig(t *testing.T) {
// failure to save
require.Error(t, conf2.save(filepath.Join(tempDir, "non/existing/folder", tempFileName)))
}
func TestIsInternetStatus(t *testing.T) {
require.True(t, NewInternetStatusEvent(true).isInternetStatus())
require.True(t, NewInternetStatusEvent(false).isInternetStatus())
require.False(t, NewKeychainHasNoKeychainEvent().isInternetStatus())
require.False(t, NewLoginAlreadyLoggedInEvent("").isInternetStatus())
}
func TestFilterOutInternetStatusEvents(t *testing.T) {
require.Zero(t, len(filterOutInternetStatusEvents([]*StreamEvent{})))
off := NewInternetStatusEvent(false)
on := NewInternetStatusEvent(true)
show := NewShowMainWindowEvent()
finished := NewLoginFinishedEvent("id")
require.Zero(t, len(filterOutInternetStatusEvents([]*StreamEvent{})))
require.Zero(t, len(filterOutInternetStatusEvents([]*StreamEvent{off, on, off})))
require.Equal(t, filterOutInternetStatusEvents([]*StreamEvent{off, show, on}), []*StreamEvent{show})
require.Equal(t, filterOutInternetStatusEvents([]*StreamEvent{finished, off, show, on}), []*StreamEvent{finished, show})
require.Equal(t, filterOutInternetStatusEvents([]*StreamEvent{finished, show}), []*StreamEvent{finished, show})
}

View File

@ -90,7 +90,6 @@ func NewService(
panicHandler types.PanicHandler,
eventListener listener.Listener,
updater types.Updater,
bridge types.Bridger,
restarter types.Restarter,
locations *locations.Locations,
) *Service {
@ -99,7 +98,6 @@ func NewService(
panicHandler: panicHandler,
eventListener: eventListener,
updater: updater,
bridge: bridge,
restarter: restarter,
showOnStartup: showOnStartup,
@ -115,6 +113,15 @@ func NewService(
// set to 1
s.initializing.Add(1)
go func() {
defer s.panicHandler.HandlePanic()
s.watchEvents()
}()
return &s
}
func (s *Service) startGRPCServer() {
tlsConfig, pemCert, err := s.generateTLSConfig()
if err != nil {
s.log.WithError(err).Panic("could not generate gRPC TLS config")
@ -122,10 +129,9 @@ func NewService(
s.pemCert = string(pemCert)
s.initAutostart()
s.grpcServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
RegisterBridgeServer(s.grpcServer, &s)
RegisterBridgeServer(s.grpcServer, s)
s.listener, err = net.Listen("tcp", "127.0.0.1:0") // Port 0 means that the port is randomly picked by the system.
if err != nil {
@ -137,8 +143,6 @@ func NewService(
}
s.log.Info("gRPC server listening at ", s.listener.Addr())
return &s
}
func (s *Service) initAutostart() {
@ -158,15 +162,18 @@ func (s *Service) initAutostart() {
})
}
func (s *Service) Loop() error {
func (s *Service) Loop(b types.Bridger) error {
s.bridge = b
s.initAutostart()
s.startGRPCServer()
defer func() {
s.bridge.SetBool(settings.FirstStartGUIKey, false)
}()
go func() {
defer s.panicHandler.HandlePanic()
s.watchEvents()
}()
if s.bridge.HasError(bridge.ErrLocalCacheUnavailable) {
_ = s.SendEvent(NewCacheErrorEvent(CacheErrorType_CACHE_UNAVAILABLE_ERROR))
}
err := s.grpcServer.Serve(s.listener)
if err != nil {
@ -203,10 +210,6 @@ func (s *Service) WaitUntilFrontendIsReady() {
}
func (s *Service) watchEvents() { // nolint:funlen
if s.bridge.HasError(bridge.ErrLocalCacheUnavailable) {
_ = s.SendEvent(NewCacheErrorEvent(CacheErrorType_CACHE_UNAVAILABLE_ERROR))
}
errorCh := s.eventListener.ProvideChannel(events.ErrorEvent)
credentialsErrorCh := s.eventListener.ProvideChannel(events.CredentialsErrorEvent)
noActiveKeyForRecipientCh := s.eventListener.ProvideChannel(events.NoActiveKeyForRecipientEvent)
@ -258,6 +261,10 @@ func (s *Service) watchEvents() { // nolint:funlen
case address := <-addressChangedLogoutCh:
_ = s.SendEvent(NewMailAddressChangeLogoutEvent(address))
case userID := <-logoutCh:
if s.bridge == nil {
logrus.Error("Received a logout event but bridge is not yet instantiated.")
break
}
user, err := s.bridge.GetUserInfo(userID)
if err != nil {
return

View File

@ -95,12 +95,8 @@ func (s *Service) StopEventStream(ctx context.Context, _ *emptypb.Empty) (*empty
// SendEvent sends an event to the via the gRPC event stream.
func (s *Service) SendEvent(event *StreamEvent) error {
s.eventQueueMutex.Lock()
defer s.eventQueueMutex.Unlock()
if s.eventStreamCh == nil {
// nobody is connected to the event stream, we queue events
s.eventQueue = append(s.eventQueue, event)
if s.eventStreamCh == nil { // nobody is connected to the event stream, we queue events
s.queueEvent(event)
return nil
}
@ -175,3 +171,14 @@ func (s *Service) StartEventTest() error { //nolint:funlen
return nil
}
func (s *Service) queueEvent(event *StreamEvent) {
s.eventQueueMutex.Lock()
defer s.eventQueueMutex.Unlock()
if event.isInternetStatus() {
s.eventQueue = append(filterOutInternetStatusEvents(s.eventQueue), event)
} else {
s.eventQueue = append(s.eventQueue, event)
}
}