forked from Silverfish/proton-bridge
GODT-1913: pass reporter to gluon, limit restarts, add crash handlers.
This commit is contained in:
233
internal/async/group.go
Normal file
233
internal/async/group.go
Normal file
@ -0,0 +1,233 @@
|
||||
// Copyright (c) 2022 Proton AG
|
||||
//
|
||||
// This file is part of Proton Mail Bridge.
|
||||
//
|
||||
// Proton Mail Bridge is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// Proton Mail Bridge is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
package async
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PanicHandler interface {
|
||||
HandlePanic()
|
||||
}
|
||||
|
||||
// Group is forked and improved version of "github.com/bradenaw/juniper/xsync.Group".
|
||||
//
|
||||
// It manages a group of goroutines. The main change to original is posibility
|
||||
// to wait passed function to finish without canceling it's context and adding
|
||||
// PanicHandler.
|
||||
type Group struct {
|
||||
baseCtx context.Context
|
||||
ctx context.Context
|
||||
jobCtx context.Context
|
||||
cancel context.CancelFunc
|
||||
finish context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
panicHandler PanicHandler
|
||||
}
|
||||
|
||||
// NewGroup returns a Group ready for use. The context passed to any of the f functions will be a
|
||||
// descendant of ctx.
|
||||
func NewGroup(ctx context.Context, panicHandler PanicHandler) *Group {
|
||||
bgCtx, cancel := context.WithCancel(ctx)
|
||||
jobCtx, finish := context.WithCancel(ctx)
|
||||
return &Group{
|
||||
baseCtx: ctx,
|
||||
ctx: bgCtx,
|
||||
jobCtx: jobCtx,
|
||||
cancel: cancel,
|
||||
finish: finish,
|
||||
panicHandler: panicHandler,
|
||||
}
|
||||
}
|
||||
|
||||
// Once calls f once from another goroutine.
|
||||
func (g *Group) Once(f func(ctx context.Context)) {
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.handlePanic()
|
||||
|
||||
f(g.ctx)
|
||||
g.wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// jitterDuration returns a random duration in [d - jitter, d + jitter].
|
||||
func jitterDuration(d time.Duration, jitter time.Duration) time.Duration {
|
||||
return d + time.Duration(float64(jitter)*((rand.Float64()*2)-1)) //nolint:gosec
|
||||
}
|
||||
|
||||
// Periodic spawns a goroutine that calls f once per interval +/- jitter.
|
||||
func (g *Group) Periodic(
|
||||
interval time.Duration,
|
||||
jitter time.Duration,
|
||||
f func(ctx context.Context),
|
||||
) {
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.handlePanic()
|
||||
|
||||
defer g.wg.Done()
|
||||
|
||||
t := time.NewTimer(jitterDuration(interval, jitter))
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
if g.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-g.jobCtx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
|
||||
t.Reset(jitterDuration(interval, jitter))
|
||||
f(g.ctx)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Trigger spawns a goroutine which calls f whenever the returned function is called. If f is
|
||||
// already running when triggered, f will run again immediately when it finishes.
|
||||
func (g *Group) Trigger(f func(ctx context.Context)) func() {
|
||||
c := make(chan struct{}, 1)
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.handlePanic()
|
||||
|
||||
defer g.wg.Done()
|
||||
|
||||
for {
|
||||
if g.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-g.jobCtx.Done():
|
||||
return
|
||||
case <-c:
|
||||
}
|
||||
f(g.ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
return func() {
|
||||
select {
|
||||
case c <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// PeriodicOrTrigger spawns a goroutine which calls f whenever the returned function is called. If
|
||||
// f is already running when triggered, f will run again immediately when it finishes. Also calls f
|
||||
// when it has been interval+/-jitter since the last trigger.
|
||||
func (g *Group) PeriodicOrTrigger(
|
||||
interval time.Duration,
|
||||
jitter time.Duration,
|
||||
f func(ctx context.Context),
|
||||
) func() {
|
||||
c := make(chan struct{}, 1)
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.handlePanic()
|
||||
|
||||
defer g.wg.Done()
|
||||
|
||||
t := time.NewTimer(jitterDuration(interval, jitter))
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
if g.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-g.jobCtx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
t.Reset(jitterDuration(interval, jitter))
|
||||
case <-c:
|
||||
if !t.Stop() {
|
||||
<-t.C
|
||||
}
|
||||
t.Reset(jitterDuration(interval, jitter))
|
||||
}
|
||||
f(g.ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
return func() {
|
||||
select {
|
||||
case c <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Group) resetCtx() {
|
||||
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
|
||||
g.ctx, g.cancel = context.WithCancel(g.baseCtx)
|
||||
}
|
||||
|
||||
// Cancel is send to all of the spawn goroutines and ends periodic
|
||||
// or trigger routines.
|
||||
func (g *Group) Cancel() {
|
||||
g.cancel()
|
||||
g.finish()
|
||||
g.resetCtx()
|
||||
}
|
||||
|
||||
// Finish will ends all periodic or polls routines. It will let
|
||||
// currently running functions to finish (cancel is not sent).
|
||||
//
|
||||
// It is not safe to call Wait concurrently with any other method on g.
|
||||
func (g *Group) Finish() {
|
||||
g.finish()
|
||||
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
|
||||
}
|
||||
|
||||
// CancelAndWait cancels the context passed to any of the spawned goroutines and waits for all spawned
|
||||
// goroutines to exit.
|
||||
//
|
||||
// It is not safe to call Wait concurrently with any other method on g.
|
||||
func (g *Group) CancelAndWait() {
|
||||
g.finish()
|
||||
g.cancel()
|
||||
g.wg.Wait()
|
||||
g.resetCtx()
|
||||
}
|
||||
|
||||
// WaitToFinish will ends all periodic or polls routines. It will wait for
|
||||
// currently running functions to finish (cancel is not sent).
|
||||
//
|
||||
// It is not safe to call Wait concurrently with any other method on g.
|
||||
func (g *Group) WaitToFinish() {
|
||||
g.finish()
|
||||
g.wg.Wait()
|
||||
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
|
||||
}
|
||||
|
||||
func (g *Group) handlePanic() {
|
||||
if g.panicHandler != nil {
|
||||
g.panicHandler.HandlePanic()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user