fix(GODT-2800): Pending Subscriptions Cleanup

Ensure pending subscriptions are cleaned up with `Service.Close()` or
we can leak go routines.
This commit is contained in:
Leander Beernaert
2023-07-24 16:59:04 +02:00
parent 776976a8a2
commit bdbf1bdd76
2 changed files with 47 additions and 2 deletions

View File

@ -62,8 +62,8 @@ type VaultEventIDStore struct {
vault *vault.User
}
func NewVaultEventIDStore(vault *VaultEventIDStore) *VaultEventIDStore {
return &VaultEventIDStore{vault: vault.vault}
func NewVaultEventIDStore(vault *vault.User) *VaultEventIDStore {
return &VaultEventIDStore{vault: vault}
}
func (v VaultEventIDStore) Load(_ context.Context) (string, error) {

View File

@ -32,7 +32,9 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal"
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/pkg/cpc"
"github.com/bradenaw/juniper/xslices"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
)
// Service polls from the given event source and ensures that all the respective subscribers get notified
@ -125,6 +127,27 @@ func (s Subscription) cancel() {
}
}
func (s Subscription) close() {
if s.User != nil {
s.User.close()
}
if s.Refresh != nil {
s.Refresh.close()
}
if s.Address != nil {
s.Address.close()
}
if s.Labels != nil {
s.Labels.close()
}
if s.Messages != nil {
s.Messages.close()
}
if s.UserUsedSpace != nil {
s.UserUsedSpace.close()
}
}
// Subscribe adds new subscribers to the service.
// This method can safely be called during event handling.
func (s *Service) Subscribe(subscription Subscription) {
@ -276,6 +299,28 @@ func (s *Service) run(ctx context.Context, lastEventID string) {
}
}
// Close should be called after the service has been cancelled to clean up any remaining pending operations.
func (s *Service) Close() {
s.pendingSubscriptionsLock.Lock()
defer s.pendingSubscriptionsLock.Unlock()
// Cleanup pending removes.
for _, subscription := range s.pendingSubscriptionsRemove {
subscription.close()
}
// Cleanup pending adds.
for _, subscription := range xslices.Filter(s.pendingSubscriptionsAdd, func(sub Subscription) bool {
return !slices.Contains(s.pendingSubscriptionsRemove, sub)
}) {
subscription.cancel()
subscription.close()
}
s.pendingSubscriptionsRemove = nil
s.pendingSubscriptionsAdd = nil
}
func (s *Service) handleEvent(ctx context.Context, lastEventID string, event proton.Event) error {
s.log.WithFields(logrus.Fields{
"old": lastEventID,