From bdbf1bdd7667de60b5ba4cfb156d9401850349a4 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Mon, 24 Jul 2023 16:59:04 +0200 Subject: [PATCH] fix(GODT-2800): Pending Subscriptions Cleanup Ensure pending subscriptions are cleaned up with `Service.Close()` or we can leak go routines. --- internal/services/userevents/eventid_store.go | 4 +- internal/services/userevents/service.go | 45 +++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/internal/services/userevents/eventid_store.go b/internal/services/userevents/eventid_store.go index ce4e3852..387ef736 100644 --- a/internal/services/userevents/eventid_store.go +++ b/internal/services/userevents/eventid_store.go @@ -62,8 +62,8 @@ type VaultEventIDStore struct { vault *vault.User } -func NewVaultEventIDStore(vault *VaultEventIDStore) *VaultEventIDStore { - return &VaultEventIDStore{vault: vault.vault} +func NewVaultEventIDStore(vault *vault.User) *VaultEventIDStore { + return &VaultEventIDStore{vault: vault} } func (v VaultEventIDStore) Load(_ context.Context) (string, error) { diff --git a/internal/services/userevents/service.go b/internal/services/userevents/service.go index 560f3575..cd52030e 100644 --- a/internal/services/userevents/service.go +++ b/internal/services/userevents/service.go @@ -32,7 +32,9 @@ import ( "github.com/ProtonMail/proton-bridge/v3/internal" "github.com/ProtonMail/proton-bridge/v3/internal/events" "github.com/ProtonMail/proton-bridge/v3/pkg/cpc" + "github.com/bradenaw/juniper/xslices" "github.com/sirupsen/logrus" + "golang.org/x/exp/slices" ) // Service polls from the given event source and ensures that all the respective subscribers get notified @@ -125,6 +127,27 @@ func (s Subscription) cancel() { } } +func (s Subscription) close() { + if s.User != nil { + s.User.close() + } + if s.Refresh != nil { + s.Refresh.close() + } + if s.Address != nil { + s.Address.close() + } + if s.Labels != nil { + s.Labels.close() + } + if s.Messages != nil { + s.Messages.close() + } + if s.UserUsedSpace != nil { + s.UserUsedSpace.close() + } +} + // Subscribe adds new subscribers to the service. // This method can safely be called during event handling. func (s *Service) Subscribe(subscription Subscription) { @@ -276,6 +299,28 @@ func (s *Service) run(ctx context.Context, lastEventID string) { } } +// Close should be called after the service has been cancelled to clean up any remaining pending operations. +func (s *Service) Close() { + s.pendingSubscriptionsLock.Lock() + defer s.pendingSubscriptionsLock.Unlock() + + // Cleanup pending removes. + for _, subscription := range s.pendingSubscriptionsRemove { + subscription.close() + } + + // Cleanup pending adds. + for _, subscription := range xslices.Filter(s.pendingSubscriptionsAdd, func(sub Subscription) bool { + return !slices.Contains(s.pendingSubscriptionsRemove, sub) + }) { + subscription.cancel() + subscription.close() + } + + s.pendingSubscriptionsRemove = nil + s.pendingSubscriptionsAdd = nil +} + func (s *Service) handleEvent(ctx context.Context, lastEventID string, event proton.Event) error { s.log.WithFields(logrus.Fields{ "old": lastEventID,