From b72de5e3a4b6fc3f9986ad9c7178561eb031f14a Mon Sep 17 00:00:00 2001 From: James Houlahan Date: Fri, 28 Oct 2022 16:16:53 +0200 Subject: [PATCH] Other: Fix goroutine leak in OnStatusDown We should spawn the goroutine as a bridge async task rather than as a normal goroutine --- internal/bridge/bridge.go | 52 +++++++++++++++++++-------------------- internal/user/user.go | 4 +-- 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/internal/bridge/bridge.go b/internal/bridge/bridge.go index 5ef1e3be..79f19a14 100644 --- a/internal/bridge/bridge.go +++ b/internal/bridge/bridge.go @@ -271,10 +271,12 @@ func (bridge *Bridge) init(tlsReporter TLSReporter) error { bridge.api.AddStatusObserver(func(status liteapi.Status) { switch { case status == liteapi.StatusUp: - go bridge.onStatusUp() + bridge.publish(events.ConnStatusUp{}) + bridge.tasks.Once(bridge.onStatusUp) case status == liteapi.StatusDown: - go bridge.onStatusDown() + bridge.publish(events.ConnStatusDown{}) + bridge.tasks.Once(bridge.onStatusDown) } }) @@ -429,48 +431,36 @@ func (bridge *Bridge) remWatcher(watcher *watcher.Watcher[events.Event]) { watcher.Close() } -func (bridge *Bridge) onStatusUp() { - bridge.publish(events.ConnStatusUp{}) - +func (bridge *Bridge) onStatusUp(ctx context.Context) { safe.RLock(func() { for _, user := range bridge.users { - user.OnStatusUp() + user.OnStatusUp(ctx) } }, bridge.usersLock) bridge.goLoad() } -func (bridge *Bridge) onStatusDown() { - bridge.publish(events.ConnStatusDown{}) - +func (bridge *Bridge) onStatusDown(ctx context.Context) { safe.RLock(func() { for _, user := range bridge.users { - user.OnStatusDown() + user.OnStatusDown(ctx) } }, bridge.usersLock) - bridge.tasks.Once(func(ctx context.Context) { - backoff := time.Second + for backoff := time.Second; ; backoff = min(backoff*2, 30*time.Second) { + select { + case <-ctx.Done(): + return - for { - select { - case <-ctx.Done(): + case <-time.After(backoff): + if err := bridge.api.Ping(ctx); err != nil { + logrus.WithError(err).Debug("Failed to ping API, will retry") + } else { return - - case <-time.After(backoff): - if err := bridge.api.Ping(ctx); err != nil { - logrus.WithError(err).Debug("Failed to ping API, will retry") - } else { - return - } - } - - if backoff < 30*time.Second { - backoff *= 2 } } - }) + } } func loadTLSConfig(vault *vault.Vault) (*tls.Config, error) { @@ -502,3 +492,11 @@ func newListener(port int, useTLS bool, tlsConfig *tls.Config) (net.Listener, er return netListener, nil } + +func min(a, b time.Duration) time.Duration { + if a < b { + return a + } + + return b +} diff --git a/internal/user/user.go b/internal/user/user.go index 50c0df89..5b8ce9b4 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -522,12 +522,12 @@ func (user *User) CheckAuth(email string, password []byte) (string, error) { } // OnStatusUp is called when the connection goes up. -func (user *User) OnStatusUp() { +func (user *User) OnStatusUp(context.Context) { user.goSync() } // OnStatusDown is called when the connection goes down. -func (user *User) OnStatusDown() { +func (user *User) OnStatusDown(context.Context) { user.abortable.Abort() }