Other: Fix goroutine leaks in integration tests

We were closing the event QueuedChannel objects in the wrong place;
they should have been closed on test teardown, not on stopBridge
(which was just a test action and wasn't always called).

In order to make the events more scalable, i replace all the
QueuedChannel objects with a single event collector, which would
create QueuedChannels on demand when it receives an event of a new type.
This commit is contained in:
James Houlahan
2022-10-24 18:56:15 +02:00
parent 7df2d70dbf
commit cb04dabea8
3 changed files with 180 additions and 186 deletions

View File

@ -21,8 +21,11 @@ import (
"context"
"fmt"
"net/smtp"
"reflect"
"regexp"
"sync"
"testing"
"time"
"github.com/Masterminds/semver/v3"
"github.com/ProtonMail/gluon/queue"
@ -31,6 +34,7 @@ import (
"github.com/ProtonMail/proton-bridge/v2/internal/locations"
"github.com/bradenaw/juniper/xslices"
"github.com/emersion/go-imap/client"
"github.com/sirupsen/logrus"
"gitlab.protontech.ch/go/liteapi"
"gitlab.protontech.ch/go/liteapi/server"
"golang.org/x/exp/maps"
@ -47,24 +51,11 @@ type testCtx struct {
storeKey []byte
version *semver.Version
mocks *bridge.Mocks
events *eventCollector
// bridge holds the bridge app under test.
bridge *bridge.Bridge
// These channels hold events of various types coming from bridge.
loginCh *queue.QueuedChannel[events.UserLoggedIn]
logoutCh *queue.QueuedChannel[events.UserLoggedOut]
loadedCh *queue.QueuedChannel[events.AllUsersLoaded]
deletedCh *queue.QueuedChannel[events.UserDeleted]
deauthCh *queue.QueuedChannel[events.UserDeauth]
addrCreatedCh *queue.QueuedChannel[events.UserAddressCreated]
addrDeletedCh *queue.QueuedChannel[events.UserAddressDeleted]
syncStartedCh *queue.QueuedChannel[events.SyncStarted]
syncFinishedCh *queue.QueuedChannel[events.SyncFinished]
forcedUpdateCh *queue.QueuedChannel[events.UpdateForced]
connStatusCh *queue.QueuedChannel[events.Event]
updateCh *queue.QueuedChannel[events.Event]
// These maps hold expected userIDByName, their primary addresses and bridge passwords.
userIDByName map[string]string
userAddrByEmail map[string]map[string]string
@ -101,8 +92,9 @@ func newTestCtx(tb testing.TB) *testCtx {
netCtl: liteapi.NewNetCtl(),
locator: locations.New(bridge.NewTestLocationsProvider(dir), "config-name"),
storeKey: []byte("super-secret-store-key"),
mocks: bridge.NewMocks(tb, defaultVersion, defaultVersion),
version: defaultVersion,
mocks: bridge.NewMocks(tb, defaultVersion, defaultVersion),
events: newEventCollector(),
userIDByName: make(map[string]string),
userAddrByEmail: make(map[string]map[string]string),
@ -250,7 +242,13 @@ func (t *testCtx) getLastError() error {
func (t *testCtx) close(ctx context.Context) error {
for _, client := range t.imapClients {
if err := client.client.Logout(); err != nil {
return err
logrus.WithError(err).Error("Failed to logout IMAP client")
}
}
for _, client := range t.smtpClients {
if err := client.client.Close(); err != nil {
logrus.WithError(err).Error("Failed to close SMTP client")
}
}
@ -262,5 +260,81 @@ func (t *testCtx) close(ctx context.Context) error {
t.api.Close()
t.events.close()
return nil
}
type eventCollector struct {
events map[reflect.Type]*queue.QueuedChannel[events.Event]
lock sync.RWMutex
wg sync.WaitGroup
}
func newEventCollector() *eventCollector {
return &eventCollector{
events: make(map[reflect.Type]*queue.QueuedChannel[events.Event]),
}
}
func (c *eventCollector) collectFrom(eventCh <-chan events.Event) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for event := range eventCh {
c.push(event)
}
}()
}
func awaitType[T events.Event](c *eventCollector, ofType T, timeout time.Duration) (T, bool) {
if event := c.await(ofType, timeout); event == nil {
return *new(T), false //nolint:gocritic
} else if event, ok := event.(T); !ok {
panic(fmt.Errorf("unexpected event type %T", event))
} else {
return event, true
}
}
func (c *eventCollector) await(ofType events.Event, timeout time.Duration) events.Event {
select {
case event := <-c.getEventCh(ofType):
return event
case <-time.After(timeout):
return nil
}
}
func (c *eventCollector) push(event events.Event) {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.events[reflect.TypeOf(event)]; !ok {
c.events[reflect.TypeOf(event)] = queue.NewQueuedChannel[events.Event](0, 0)
}
c.events[reflect.TypeOf(event)].Enqueue(event)
}
func (c *eventCollector) getEventCh(ofType events.Event) <-chan events.Event {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.events[reflect.TypeOf(ofType)]; !ok {
c.events[reflect.TypeOf(ofType)] = queue.NewQueuedChannel[events.Event](0, 0)
}
return c.events[reflect.TypeOf(ofType)].GetChannel()
}
func (c *eventCollector) close() {
c.wg.Wait()
for _, eventCh := range c.events {
eventCh.CloseAndDiscardQueued()
}
}