1
0

Other: Fix goroutine leak in OnStatusDown

We should spawn the goroutine as a bridge async task rather than as a
normal goroutine
This commit is contained in:
James Houlahan
2022-10-28 16:16:53 +02:00
parent bf4afae5d9
commit b72de5e3a4
2 changed files with 27 additions and 29 deletions

View File

@ -271,10 +271,12 @@ func (bridge *Bridge) init(tlsReporter TLSReporter) error {
bridge.api.AddStatusObserver(func(status liteapi.Status) { bridge.api.AddStatusObserver(func(status liteapi.Status) {
switch { switch {
case status == liteapi.StatusUp: case status == liteapi.StatusUp:
go bridge.onStatusUp() bridge.publish(events.ConnStatusUp{})
bridge.tasks.Once(bridge.onStatusUp)
case status == liteapi.StatusDown: case status == liteapi.StatusDown:
go bridge.onStatusDown() bridge.publish(events.ConnStatusDown{})
bridge.tasks.Once(bridge.onStatusDown)
} }
}) })
@ -429,48 +431,36 @@ func (bridge *Bridge) remWatcher(watcher *watcher.Watcher[events.Event]) {
watcher.Close() watcher.Close()
} }
func (bridge *Bridge) onStatusUp() { func (bridge *Bridge) onStatusUp(ctx context.Context) {
bridge.publish(events.ConnStatusUp{})
safe.RLock(func() { safe.RLock(func() {
for _, user := range bridge.users { for _, user := range bridge.users {
user.OnStatusUp() user.OnStatusUp(ctx)
} }
}, bridge.usersLock) }, bridge.usersLock)
bridge.goLoad() bridge.goLoad()
} }
func (bridge *Bridge) onStatusDown() { func (bridge *Bridge) onStatusDown(ctx context.Context) {
bridge.publish(events.ConnStatusDown{})
safe.RLock(func() { safe.RLock(func() {
for _, user := range bridge.users { for _, user := range bridge.users {
user.OnStatusDown() user.OnStatusDown(ctx)
} }
}, bridge.usersLock) }, bridge.usersLock)
bridge.tasks.Once(func(ctx context.Context) { for backoff := time.Second; ; backoff = min(backoff*2, 30*time.Second) {
backoff := time.Second select {
case <-ctx.Done():
return
for { case <-time.After(backoff):
select { if err := bridge.api.Ping(ctx); err != nil {
case <-ctx.Done(): logrus.WithError(err).Debug("Failed to ping API, will retry")
} else {
return return
case <-time.After(backoff):
if err := bridge.api.Ping(ctx); err != nil {
logrus.WithError(err).Debug("Failed to ping API, will retry")
} else {
return
}
}
if backoff < 30*time.Second {
backoff *= 2
} }
} }
}) }
} }
func loadTLSConfig(vault *vault.Vault) (*tls.Config, error) { func loadTLSConfig(vault *vault.Vault) (*tls.Config, error) {
@ -502,3 +492,11 @@ func newListener(port int, useTLS bool, tlsConfig *tls.Config) (net.Listener, er
return netListener, nil return netListener, nil
} }
func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

View File

@ -522,12 +522,12 @@ func (user *User) CheckAuth(email string, password []byte) (string, error) {
} }
// OnStatusUp is called when the connection goes up. // OnStatusUp is called when the connection goes up.
func (user *User) OnStatusUp() { func (user *User) OnStatusUp(context.Context) {
user.goSync() user.goSync()
} }
// OnStatusDown is called when the connection goes down. // OnStatusDown is called when the connection goes down.
func (user *User) OnStatusDown() { func (user *User) OnStatusDown(context.Context) {
user.abortable.Abort() user.abortable.Abort()
} }