From db2379e2fdf4f205a633fb24a994c77f6c1f2c8f Mon Sep 17 00:00:00 2001 From: Xavier Michelon Date: Mon, 10 Oct 2022 17:32:08 +0200 Subject: [PATCH] GODT-1932: frontend is instantiated before bridge. WIP: introduced frontend.Type. WIP: frontend is create before bridge is instantiated. WIP: filtering of internet stastus event in gRPC event queue. --- internal/app/bridge/bridge.go | 60 ++++++++++--------- internal/frontend/cli/frontend.go | 5 +- internal/frontend/frontend.go | 26 +++++--- internal/frontend/grpc/event_utils.go | 32 ++++++++++ .../grpc/{config_test.go => grpc_test.go} | 22 +++++++ internal/frontend/grpc/service.go | 37 +++++++----- internal/frontend/grpc/service_stream.go | 19 ++++-- 7 files changed, 140 insertions(+), 61 deletions(-) create mode 100644 internal/frontend/grpc/event_utils.go rename internal/frontend/grpc/{config_test.go => grpc_test.go} (57%) diff --git a/internal/app/bridge/bridge.go b/internal/app/bridge/bridge.go index c20f9536..0f58d5bf 100644 --- a/internal/app/bridge/bridge.go +++ b/internal/app/bridge/bridge.go @@ -44,9 +44,9 @@ const ( flagLogSMTP = "log-smtp" flagNonInteractive = "noninteractive" - // Memory cache was estimated by empirical usage in past and it was set to 100MB. + // Memory cache was estimated by empirical usage in the past, and it was set to 100MB. // NOTE: This value must not be less than maximal size of one email (~30MB). - inMemoryCacheLimnit = 100 * (1 << 20) + inMemoryCacheLimit = 100 * (1 << 20) ) func New(base *base.Base) *cli.App { @@ -63,7 +63,7 @@ func New(base *base.Base) *cli.App { }, &cli.BoolFlag{ Name: flagNonInteractive, - Usage: "Start Bridge entirely noninteractively", + Usage: "Start Bridge entirely non-interactively", }, }...) @@ -71,6 +71,17 @@ func New(base *base.Base) *cli.App { } func main(b *base.Base, c *cli.Context) error { //nolint:funlen + frontendType := getFrontendTypeFromCLIParams(c) + f := frontend.New( + frontendType, + !c.Bool(base.FlagNoWindow), + b.CrashHandler, + b.Listener, + b.Updater, + b, + b.Locations, + ) + cache, cacheErr := loadMessageCache(b) if cacheErr != nil { logrus.WithError(cacheErr).Error("Could not load local cache.") @@ -141,28 +152,10 @@ func main(b *base.Base, c *cli.Context) error { //nolint:funlen // We want cookies to be saved to disk so they are loaded the next time. b.AddTeardownAction(b.CookieJar.PersistCookies) - var frontendMode string - - switch { - case c.Bool(base.FlagCLI): - frontendMode = "cli" - case c.Bool(flagNonInteractive): - return <-(make(chan error)) // Block forever. - default: - frontendMode = "grpc" + if frontendType == frontend.NonInteractive { + return <-(make(chan error)) } - f := frontend.New( - frontendMode, - !c.Bool(base.FlagNoWindow), - b.CrashHandler, - b.Listener, - b.Updater, - bridge, - b, - b.Locations, - ) - // Watch for updates routine go func() { ticker := time.NewTicker(constants.UpdateCheckInterval) @@ -173,7 +166,18 @@ func main(b *base.Base, c *cli.Context) error { //nolint:funlen } }() - return f.Loop() + return f.Loop(bridge) +} + +func getFrontendTypeFromCLIParams(c *cli.Context) frontend.Type { + switch { + case c.Bool(base.FlagCLI): + return frontend.CLI + case c.Bool(flagNonInteractive): + return frontend.NonInteractive + default: + return frontend.GRPC + } } func checkAndHandleUpdate(u types.Updater, f frontend.Frontend, autoUpdate bool) { @@ -226,7 +230,7 @@ func checkAndHandleUpdate(u types.Updater, f frontend.Frontend, autoUpdate bool) // local cache is enabled but unavailable (in-memory cache will be returned nevertheless). func loadMessageCache(b *base.Base) (cache.Cache, error) { if !b.Settings.GetBool(settings.CacheEnabledKey) { - return cache.NewInMemoryCache(inMemoryCacheLimnit), nil + return cache.NewInMemoryCache(inMemoryCacheLimit), nil } var compressor cache.Compressor @@ -246,12 +250,12 @@ func loadMessageCache(b *base.Base) (cache.Cache, error) { path = customPath } else { path = b.Cache.GetDefaultMessageCacheDir() - // Store path so it will allways persist if default location + // Store path so it will always persist if default location // will be changed in new version. b.Settings.Set(settings.CacheLocationKey, path) } - // To prevent memory peaks we set maximal write concurency for store + // To prevent memory peaks we set maximal write concurrency for store // build jobs. store.SetBuildAndCacheJobLimit(b.Settings.GetInt(settings.CacheConcurrencyWrite)) @@ -262,7 +266,7 @@ func loadMessageCache(b *base.Base) (cache.Cache, error) { ConcurrentWrite: b.Settings.GetInt(settings.CacheConcurrencyWrite), }) if err != nil { - return cache.NewInMemoryCache(inMemoryCacheLimnit), err + return cache.NewInMemoryCache(inMemoryCacheLimit), err } return messageCache, nil diff --git a/internal/frontend/cli/frontend.go b/internal/frontend/cli/frontend.go index 10ad573c..1597ea80 100644 --- a/internal/frontend/cli/frontend.go +++ b/internal/frontend/cli/frontend.go @@ -47,7 +47,6 @@ func New( //nolint:funlen eventListener listener.Listener, updater types.Updater, - bridge types.Bridger, restarter types.Restarter, ) *frontendCLI { //nolint:revive fe := &frontendCLI{ @@ -55,7 +54,6 @@ func New( //nolint:funlen eventListener: eventListener, updater: updater, - bridge: bridge, restarter: restarter, } @@ -319,7 +317,8 @@ func (f *frontendCLI) watchEvents() { } // Loop starts the frontend loop with an interactive shell. -func (f *frontendCLI) Loop() error { +func (f *frontendCLI) Loop(b types.Bridger) error { + f.bridge = b f.Printf(` Welcome to %s interactive shell ___....___ diff --git a/internal/frontend/frontend.go b/internal/frontend/frontend.go index fc3e407b..b701787f 100644 --- a/internal/frontend/frontend.go +++ b/internal/frontend/frontend.go @@ -19,7 +19,6 @@ package frontend import ( - "github.com/ProtonMail/proton-bridge/v2/internal/bridge" "github.com/ProtonMail/proton-bridge/v2/internal/frontend/cli" "github.com/ProtonMail/proton-bridge/v2/internal/frontend/grpc" "github.com/ProtonMail/proton-bridge/v2/internal/frontend/types" @@ -28,8 +27,17 @@ import ( "github.com/ProtonMail/proton-bridge/v2/pkg/listener" ) +// Type describes the available types of frontend. +type Type int + +const ( + CLI Type = iota + GRPC + NonInteractive +) + type Frontend interface { - Loop() error + Loop(b types.Bridger) error NotifyManualUpdate(update updater.VersionInfo, canInstall bool) SetVersion(update updater.VersionInfo) NotifySilentUpdateInstalled() @@ -37,38 +45,38 @@ type Frontend interface { WaitUntilFrontendIsReady() } -// New returns initialized frontend based on `frontendType`, which can be `cli` or `grpc`. +// New returns initialized frontend based on `frontendType`, which can be `CLI` or `GRPC`. func New( - frontendType string, + frontendType Type, showWindowOnStart bool, panicHandler types.PanicHandler, eventListener listener.Listener, updater types.Updater, - bridge *bridge.Bridge, restarter types.Restarter, locations *locations.Locations, ) Frontend { switch frontendType { - case "grpc": + case GRPC: return grpc.NewService( showWindowOnStart, panicHandler, eventListener, updater, - bridge, restarter, locations, ) - case "cli": + case CLI: return cli.New( panicHandler, eventListener, updater, - bridge, restarter, ) + case NonInteractive: + fallthrough + default: return nil } diff --git a/internal/frontend/grpc/event_utils.go b/internal/frontend/grpc/event_utils.go new file mode 100644 index 00000000..cbbab163 --- /dev/null +++ b/internal/frontend/grpc/event_utils.go @@ -0,0 +1,32 @@ +// Copyright (c) 2022 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package grpc + +import "github.com/bradenaw/juniper/xslices" + +// isInternetStatus returns true iff the event is InternetStatus. +func (x *StreamEvent) isInternetStatus() bool { + appEvent := x.GetApp() + + return (appEvent != nil) && (appEvent.GetInternetStatus() != nil) +} + +// filterOutInternetStatusEvents return a copy of the events list where all internet connection events have been removed. +func filterOutInternetStatusEvents(events []*StreamEvent) []*StreamEvent { + return xslices.Filter(events, func(event *StreamEvent) bool { return !event.isInternetStatus() }) +} diff --git a/internal/frontend/grpc/config_test.go b/internal/frontend/grpc/grpc_test.go similarity index 57% rename from internal/frontend/grpc/config_test.go rename to internal/frontend/grpc/grpc_test.go index f54e6f88..37d31ebe 100644 --- a/internal/frontend/grpc/config_test.go +++ b/internal/frontend/grpc/grpc_test.go @@ -53,3 +53,25 @@ func TestConfig(t *testing.T) { // failure to save require.Error(t, conf2.save(filepath.Join(tempDir, "non/existing/folder", tempFileName))) } + +func TestIsInternetStatus(t *testing.T) { + require.True(t, NewInternetStatusEvent(true).isInternetStatus()) + require.True(t, NewInternetStatusEvent(false).isInternetStatus()) + require.False(t, NewKeychainHasNoKeychainEvent().isInternetStatus()) + require.False(t, NewLoginAlreadyLoggedInEvent("").isInternetStatus()) +} + +func TestFilterOutInternetStatusEvents(t *testing.T) { + require.Zero(t, len(filterOutInternetStatusEvents([]*StreamEvent{}))) + + off := NewInternetStatusEvent(false) + on := NewInternetStatusEvent(true) + show := NewShowMainWindowEvent() + finished := NewLoginFinishedEvent("id") + + require.Zero(t, len(filterOutInternetStatusEvents([]*StreamEvent{}))) + require.Zero(t, len(filterOutInternetStatusEvents([]*StreamEvent{off, on, off}))) + require.Equal(t, filterOutInternetStatusEvents([]*StreamEvent{off, show, on}), []*StreamEvent{show}) + require.Equal(t, filterOutInternetStatusEvents([]*StreamEvent{finished, off, show, on}), []*StreamEvent{finished, show}) + require.Equal(t, filterOutInternetStatusEvents([]*StreamEvent{finished, show}), []*StreamEvent{finished, show}) +} diff --git a/internal/frontend/grpc/service.go b/internal/frontend/grpc/service.go index bf95a244..5e6bf74f 100644 --- a/internal/frontend/grpc/service.go +++ b/internal/frontend/grpc/service.go @@ -90,7 +90,6 @@ func NewService( panicHandler types.PanicHandler, eventListener listener.Listener, updater types.Updater, - bridge types.Bridger, restarter types.Restarter, locations *locations.Locations, ) *Service { @@ -99,7 +98,6 @@ func NewService( panicHandler: panicHandler, eventListener: eventListener, updater: updater, - bridge: bridge, restarter: restarter, showOnStartup: showOnStartup, @@ -115,6 +113,15 @@ func NewService( // set to 1 s.initializing.Add(1) + go func() { + defer s.panicHandler.HandlePanic() + s.watchEvents() + }() + + return &s +} + +func (s *Service) startGRPCServer() { tlsConfig, pemCert, err := s.generateTLSConfig() if err != nil { s.log.WithError(err).Panic("could not generate gRPC TLS config") @@ -122,10 +129,9 @@ func NewService( s.pemCert = string(pemCert) - s.initAutostart() s.grpcServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig))) - RegisterBridgeServer(s.grpcServer, &s) + RegisterBridgeServer(s.grpcServer, s) s.listener, err = net.Listen("tcp", "127.0.0.1:0") // Port 0 means that the port is randomly picked by the system. if err != nil { @@ -137,8 +143,6 @@ func NewService( } s.log.Info("gRPC server listening at ", s.listener.Addr()) - - return &s } func (s *Service) initAutostart() { @@ -158,15 +162,18 @@ func (s *Service) initAutostart() { }) } -func (s *Service) Loop() error { +func (s *Service) Loop(b types.Bridger) error { + s.bridge = b + s.initAutostart() + s.startGRPCServer() + defer func() { s.bridge.SetBool(settings.FirstStartGUIKey, false) }() - go func() { - defer s.panicHandler.HandlePanic() - s.watchEvents() - }() + if s.bridge.HasError(bridge.ErrLocalCacheUnavailable) { + _ = s.SendEvent(NewCacheErrorEvent(CacheErrorType_CACHE_UNAVAILABLE_ERROR)) + } err := s.grpcServer.Serve(s.listener) if err != nil { @@ -203,10 +210,6 @@ func (s *Service) WaitUntilFrontendIsReady() { } func (s *Service) watchEvents() { // nolint:funlen - if s.bridge.HasError(bridge.ErrLocalCacheUnavailable) { - _ = s.SendEvent(NewCacheErrorEvent(CacheErrorType_CACHE_UNAVAILABLE_ERROR)) - } - errorCh := s.eventListener.ProvideChannel(events.ErrorEvent) credentialsErrorCh := s.eventListener.ProvideChannel(events.CredentialsErrorEvent) noActiveKeyForRecipientCh := s.eventListener.ProvideChannel(events.NoActiveKeyForRecipientEvent) @@ -258,6 +261,10 @@ func (s *Service) watchEvents() { // nolint:funlen case address := <-addressChangedLogoutCh: _ = s.SendEvent(NewMailAddressChangeLogoutEvent(address)) case userID := <-logoutCh: + if s.bridge == nil { + logrus.Error("Received a logout event but bridge is not yet instantiated.") + break + } user, err := s.bridge.GetUserInfo(userID) if err != nil { return diff --git a/internal/frontend/grpc/service_stream.go b/internal/frontend/grpc/service_stream.go index d8d1beda..f671a92d 100644 --- a/internal/frontend/grpc/service_stream.go +++ b/internal/frontend/grpc/service_stream.go @@ -95,12 +95,8 @@ func (s *Service) StopEventStream(ctx context.Context, _ *emptypb.Empty) (*empty // SendEvent sends an event to the via the gRPC event stream. func (s *Service) SendEvent(event *StreamEvent) error { - s.eventQueueMutex.Lock() - defer s.eventQueueMutex.Unlock() - - if s.eventStreamCh == nil { - // nobody is connected to the event stream, we queue events - s.eventQueue = append(s.eventQueue, event) + if s.eventStreamCh == nil { // nobody is connected to the event stream, we queue events + s.queueEvent(event) return nil } @@ -175,3 +171,14 @@ func (s *Service) StartEventTest() error { //nolint:funlen return nil } + +func (s *Service) queueEvent(event *StreamEvent) { + s.eventQueueMutex.Lock() + defer s.eventQueueMutex.Unlock() + + if event.isInternetStatus() { + s.eventQueue = append(filterOutInternetStatusEvents(s.eventQueue), event) + } else { + s.eventQueue = append(s.eventQueue, event) + } +}