From 6bbaf03f1f80fe969364013281521d1e3aa99cb4 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Fri, 21 Oct 2022 13:58:18 +0200 Subject: [PATCH] Other: Fix goroutine leaks in sync tests Add missing Close calls. Properly handle nil channel for `user.startSync`. This patch also updated liteapi and Gluon to latest master and dev version respectively. --- COPYING_NOTES.md | 1 + go.mod | 3 ++- go.sum | 8 +++++--- internal/bridge/bridge.go | 15 ++++++++++++++- internal/bridge/sync_test.go | 5 +++++ internal/bridge/user.go | 3 +++ internal/user/events.go | 2 +- internal/user/types.go | 12 +++++------- internal/user/user.go | 23 +++++++++++++++-------- tests/ctx_bridge_test.go | 12 ++++++++++++ 10 files changed, 63 insertions(+), 21 deletions(-) diff --git a/COPYING_NOTES.md b/COPYING_NOTES.md index 67963d5d..a54faf11 100644 --- a/COPYING_NOTES.md +++ b/COPYING_NOTES.md @@ -57,6 +57,7 @@ Proton Mail Bridge includes the following 3rd party software: * [testify](https://github.com/stretchr/testify) available under [license](https://github.com/stretchr/testify/blob/master/LICENSE) * [cli](https://github.com/urfave/cli/v2) available under [license](https://github.com/urfave/cli/v2/blob/master/LICENSE) * [liteapi](https://gitlab.protontech.ch/go/liteapi) +* [goleak](https://go.uber.org/goleak) * [exp](https://golang.org/x/exp) available under [license](https://cs.opensource.google/go/x/exp/+/master:LICENSE) * [net](https://golang.org/x/net) available under [license](https://cs.opensource.google/go/x/net/+/master:LICENSE) * [sys](https://golang.org/x/sys) available under [license](https://cs.opensource.google/go/x/sys/+/master:LICENSE) diff --git a/go.mod b/go.mod index 6fb5b757..445c59fe 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( github.com/0xAX/notificator v0.0.0-20220220101646-ee9b8921e557 github.com/Masterminds/semver/v3 v3.1.1 - github.com/ProtonMail/gluon v0.13.0 + github.com/ProtonMail/gluon v0.13.1-0.20221021093632-0b277a6d0226 github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a github.com/ProtonMail/go-rfc5322 v0.11.0 github.com/ProtonMail/gopenpgp/v2 v2.4.10 @@ -39,6 +39,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/urfave/cli/v2 v2.16.3 gitlab.protontech.ch/go/liteapi v0.35.0 + go.uber.org/goleak v1.2.0 golang.org/x/exp v0.0.0-20220921164117-439092de6870 golang.org/x/net v0.1.0 golang.org/x/sys v0.1.0 diff --git a/go.sum b/go.sum index ce3a192a..44de99b9 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo= github.com/ProtonMail/docker-credential-helpers v1.1.0 h1:+kvUIpwWcbtP3WFv5sSvkFn/XLzSqPOB5AAthuk9xPk= github.com/ProtonMail/docker-credential-helpers v1.1.0/go.mod h1:mK0aBveCxhnQ756AmaTfXMZDeULvheYVhF/MWMErN5g= -github.com/ProtonMail/gluon v0.13.0 h1:WgL32KvMcanomDP3Z0mSs61QYmNHAtSEbVlimD5seiU= -github.com/ProtonMail/gluon v0.13.0/go.mod h1:XW/gcr4jErc5bX5yMqkUq3U+AucC2QZHJ5L231k3Nw4= +github.com/ProtonMail/gluon v0.13.1-0.20221021093632-0b277a6d0226 h1:PWPtXMDKHGY3tG6SEeKE5B7BgfHqTDui4DREb0GoTfU= +github.com/ProtonMail/gluon v0.13.1-0.20221021093632-0b277a6d0226/go.mod h1:XW/gcr4jErc5bX5yMqkUq3U+AucC2QZHJ5L231k3Nw4= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a h1:D+aZah+k14Gn6kmL7eKxoo/4Dr/lK3ChBcwce2+SQP4= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a/go.mod h1:oTGdE7/DlWIr23G0IKW3OXK9wZ5Hw1GGiaJFccTvZi4= github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo= @@ -405,7 +405,8 @@ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -436,6 +437,7 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= diff --git a/internal/bridge/bridge.go b/internal/bridge/bridge.go index 6d0cf127..beebf594 100644 --- a/internal/bridge/bridge.go +++ b/internal/bridge/bridge.go @@ -93,6 +93,8 @@ type Bridge struct { // stopCh is used to stop ongoing goroutines when the bridge is closed. stopCh chan struct{} + + closeEventChFn func() } // New creates a new bridge. @@ -165,7 +167,9 @@ func New( //nolint:funlen ) // Get an event channel for all events (individual events can be subscribed to later). - eventCh, _ := bridge.GetEvents() + eventCh, closeFn := bridge.GetEvents() + + bridge.closeEventChFn = closeFn if err := bridge.init(tlsReporter); err != nil { return nil, nil, fmt.Errorf("failed to initialize bridge: %w", err) @@ -303,6 +307,14 @@ func (bridge *Bridge) GetErrors() []error { } func (bridge *Bridge) Close(ctx context.Context) error { + defer func() { + if bridge.closeEventChFn != nil { + bridge.closeEventChFn() + } + + bridge.closeEventChFn = nil + }() + // Stop ongoing operations such as connectivity checks. close(bridge.stopCh) @@ -356,6 +368,7 @@ func (bridge *Bridge) addWatcher(ofType ...events.Event) *watcher.Watcher[events } func (bridge *Bridge) remWatcher(oldWatcher *watcher.Watcher[events.Event]) { + oldWatcher.Close() bridge.watchers.Delete(oldWatcher) } diff --git a/internal/bridge/sync_test.go b/internal/bridge/sync_test.go index d91be392..ac960492 100644 --- a/internal/bridge/sync_test.go +++ b/internal/bridge/sync_test.go @@ -25,6 +25,8 @@ import ( "runtime" "testing" + "go.uber.org/goleak" + "github.com/ProtonMail/proton-bridge/v2/internal/bridge" "github.com/ProtonMail/proton-bridge/v2/internal/events" "github.com/bradenaw/juniper/iterator" @@ -36,6 +38,8 @@ import ( ) func TestBridge_Sync(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + s := server.New() defer s.Close() @@ -56,6 +60,7 @@ func TestBridge_Sync(t *testing.T) { liteapi.WithTransport(liteapi.InsecureTransport()), ).NewClientWithLogin(ctx, "imap", password) require.NoError(t, err) + defer c.Close() user, err := c.GetUser(ctx) require.NoError(t, err) diff --git a/internal/bridge/user.go b/internal/bridge/user.go index cc8f161f..745de222 100644 --- a/internal/bridge/user.go +++ b/internal/bridge/user.go @@ -394,6 +394,9 @@ func (bridge *Bridge) addUserWithVault( return fmt.Errorf("failed to create user: %w", err) } + if bridge.users.Has(apiUser.ID) { + panic("double add") + } bridge.users.Set(apiUser.ID, user) // Connect the user's address(es) to gluon. diff --git a/internal/user/events.go b/internal/user/events.go index db0b284e..e19dd9e0 100644 --- a/internal/user/events.go +++ b/internal/user/events.go @@ -154,7 +154,7 @@ func (user *User) handleDeleteAddressEvent(_ context.Context, event liteapi.Addr if ok := user.updateCh.GetDelete(event.ID, func(updateCh *queue.QueuedChannel[imap.Update]) { if user.vault.AddressMode() == vault.SplitMode { - updateCh.Close() + updateCh.CloseAndDiscardQueued() } }); !ok { return fmt.Errorf("no such address %q", event.ID) diff --git a/internal/user/types.go b/internal/user/types.go index 8e27c8ea..585fa094 100644 --- a/internal/user/types.go +++ b/internal/user/types.go @@ -94,21 +94,19 @@ func getAddrID(apiAddrs []liteapi.Address, email string) (string, error) { } // contextWithStopCh returns a new context that is cancelled when the stop channel is closed or a value is sent to it. -func contextWithStopCh(ctx context.Context, stopCh ...<-chan struct{}) (context.Context, context.CancelFunc) { +func contextWithStopCh(ctx context.Context, channels ...<-chan struct{}) (context.Context, context.CancelFunc) { ctx, cancel := context.WithCancel(ctx) - for _, stopCh := range stopCh { - stopCh := stopCh - - go func() { + for _, stopCh := range channels { + go func(ch <-chan struct{}) { select { - case <-stopCh: + case <-ch: cancel() case <-ctx.Done(): // ... } - }() + }(stopCh) } return ctx, cancel diff --git a/internal/user/user.go b/internal/user/user.go index e02df90c..174c1b66 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "strings" + "sync" "sync/atomic" "time" @@ -55,6 +56,7 @@ type User struct { syncStopCh chan struct{} syncLock try.Group + syncWG sync.WaitGroup showAllMail int32 } @@ -135,10 +137,13 @@ func New(ctx context.Context, encVault *vault.User, client *liteapi.Client, apiU // GODT-1946 - Don't start the event loop until the initial sync has finished. eventCh := user.client.NewEventStream(EventPeriod, EventJitter, user.vault.EventID()) + user.syncWG.Add(1) // If we haven't synced yet, do it first. // If it fails, we don't start the event loop. // Otherwise, begin processing API events, logging any errors that occur. go func() { + defer user.syncWG.Done() + if err := <-user.startSync(); err != nil { return } @@ -200,7 +205,7 @@ func (user *User) SetAddressMode(ctx context.Context, mode vault.AddressMode) er user.updateCh.Values(func(updateCh []*queue.QueuedChannel[imap.Update]) { for _, updateCh := range xslices.Unique(updateCh) { - updateCh.Close() + updateCh.CloseAndDiscardQueued() } }) @@ -393,6 +398,8 @@ func (user *User) Logout(ctx context.Context) error { // Close closes ongoing connections and cleans up resources. func (user *User) Close() error { + defer user.syncWG.Wait() + // Close any ongoing operations. close(user.stopCh) @@ -405,12 +412,12 @@ func (user *User) Close() error { // Close the user's update channels. user.updateCh.Values(func(updateCh []*queue.QueuedChannel[imap.Update]) { for _, updateCh := range xslices.Unique(updateCh) { - updateCh.Close() + updateCh.CloseAndDiscardQueued() } }) // Close the user's notify channel. - user.eventCh.Close() + user.eventCh.CloseAndDiscardQueued() // Close the user's vault. if err := user.vault.Close(); err != nil { @@ -462,16 +469,16 @@ func (user *User) streamEvents(eventCh <-chan liteapi.Event) <-chan error { // startSync begins a startSync for the user. func (user *User) startSync() <-chan error { - if user.vault.SyncStatus().IsComplete() { - logrus.Debug("Already synced, skipping") - return nil - } - errCh := make(chan error) user.syncLock.GoTry(func(ok bool) { defer close(errCh) + if user.vault.SyncStatus().IsComplete() { + logrus.Debug("Already synced, skipping") + return + } + if !ok { logrus.Debug("Sync already in progress, skipping") return diff --git a/tests/ctx_bridge_test.go b/tests/ctx_bridge_test.go index 75074398..f74b3ad4 100644 --- a/tests/ctx_bridge_test.go +++ b/tests/ctx_bridge_test.go @@ -161,6 +161,18 @@ func (t *testCtx) stopBridge() error { } t.bridge = nil + t.loginCh.CloseAndDiscardQueued() + t.logoutCh.CloseAndDiscardQueued() + t.loadedCh.CloseAndDiscardQueued() + t.deletedCh.CloseAndDiscardQueued() + t.deauthCh.CloseAndDiscardQueued() + t.addrCreatedCh.CloseAndDiscardQueued() + t.addrDeletedCh.CloseAndDiscardQueued() + t.syncStartedCh.CloseAndDiscardQueued() + t.syncFinishedCh.CloseAndDiscardQueued() + t.forcedUpdateCh.CloseAndDiscardQueued() + t.connStatusCh.CloseAndDiscardQueued() + t.updateCh.CloseAndDiscardQueued() return nil }