mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-10 04:36:43 +00:00
234 lines
5.3 KiB
Go
234 lines
5.3 KiB
Go
// Copyright (c) 2023 Proton AG
|
|
//
|
|
// This file is part of Proton Mail Bridge.
|
|
//
|
|
// Proton Mail Bridge is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// Proton Mail Bridge is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package async
|
|
|
|
import (
|
|
"context"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type PanicHandler interface {
|
|
HandlePanic()
|
|
}
|
|
|
|
// Group is forked and improved version of "github.com/bradenaw/juniper/xsync.Group".
|
|
//
|
|
// It manages a group of goroutines. The main change to original is posibility
|
|
// to wait passed function to finish without canceling it's context and adding
|
|
// PanicHandler.
|
|
type Group struct {
|
|
baseCtx context.Context
|
|
ctx context.Context
|
|
jobCtx context.Context
|
|
cancel context.CancelFunc
|
|
finish context.CancelFunc
|
|
wg sync.WaitGroup
|
|
|
|
panicHandler PanicHandler
|
|
}
|
|
|
|
// NewGroup returns a Group ready for use. The context passed to any of the f functions will be a
|
|
// descendant of ctx.
|
|
func NewGroup(ctx context.Context, panicHandler PanicHandler) *Group {
|
|
bgCtx, cancel := context.WithCancel(ctx)
|
|
jobCtx, finish := context.WithCancel(ctx)
|
|
return &Group{
|
|
baseCtx: ctx,
|
|
ctx: bgCtx,
|
|
jobCtx: jobCtx,
|
|
cancel: cancel,
|
|
finish: finish,
|
|
panicHandler: panicHandler,
|
|
}
|
|
}
|
|
|
|
// Once calls f once from another goroutine.
|
|
func (g *Group) Once(f func(ctx context.Context)) {
|
|
g.wg.Add(1)
|
|
go func() {
|
|
defer g.handlePanic()
|
|
|
|
f(g.ctx)
|
|
g.wg.Done()
|
|
}()
|
|
}
|
|
|
|
// jitterDuration returns a random duration in [d - jitter, d + jitter].
|
|
func jitterDuration(d time.Duration, jitter time.Duration) time.Duration {
|
|
return d + time.Duration(float64(jitter)*((rand.Float64()*2)-1)) //nolint:gosec
|
|
}
|
|
|
|
// Periodic spawns a goroutine that calls f once per interval +/- jitter.
|
|
func (g *Group) Periodic(
|
|
interval time.Duration,
|
|
jitter time.Duration,
|
|
f func(ctx context.Context),
|
|
) {
|
|
g.wg.Add(1)
|
|
go func() {
|
|
defer g.handlePanic()
|
|
|
|
defer g.wg.Done()
|
|
|
|
t := time.NewTimer(jitterDuration(interval, jitter))
|
|
defer t.Stop()
|
|
|
|
for {
|
|
if g.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-g.jobCtx.Done():
|
|
return
|
|
case <-t.C:
|
|
}
|
|
|
|
t.Reset(jitterDuration(interval, jitter))
|
|
f(g.ctx)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Trigger spawns a goroutine which calls f whenever the returned function is called. If f is
|
|
// already running when triggered, f will run again immediately when it finishes.
|
|
func (g *Group) Trigger(f func(ctx context.Context)) func() {
|
|
c := make(chan struct{}, 1)
|
|
g.wg.Add(1)
|
|
go func() {
|
|
defer g.handlePanic()
|
|
|
|
defer g.wg.Done()
|
|
|
|
for {
|
|
if g.ctx.Err() != nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-g.jobCtx.Done():
|
|
return
|
|
case <-c:
|
|
}
|
|
f(g.ctx)
|
|
}
|
|
}()
|
|
|
|
return func() {
|
|
select {
|
|
case c <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// PeriodicOrTrigger spawns a goroutine which calls f whenever the returned function is called. If
|
|
// f is already running when triggered, f will run again immediately when it finishes. Also calls f
|
|
// when it has been interval+/-jitter since the last trigger.
|
|
func (g *Group) PeriodicOrTrigger(
|
|
interval time.Duration,
|
|
jitter time.Duration,
|
|
f func(ctx context.Context),
|
|
) func() {
|
|
c := make(chan struct{}, 1)
|
|
g.wg.Add(1)
|
|
go func() {
|
|
defer g.handlePanic()
|
|
|
|
defer g.wg.Done()
|
|
|
|
t := time.NewTimer(jitterDuration(interval, jitter))
|
|
defer t.Stop()
|
|
|
|
for {
|
|
if g.ctx.Err() != nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-g.jobCtx.Done():
|
|
return
|
|
case <-t.C:
|
|
t.Reset(jitterDuration(interval, jitter))
|
|
case <-c:
|
|
if !t.Stop() {
|
|
<-t.C
|
|
}
|
|
t.Reset(jitterDuration(interval, jitter))
|
|
}
|
|
f(g.ctx)
|
|
}
|
|
}()
|
|
|
|
return func() {
|
|
select {
|
|
case c <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (g *Group) resetCtx() {
|
|
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
|
|
g.ctx, g.cancel = context.WithCancel(g.baseCtx)
|
|
}
|
|
|
|
// Cancel is send to all of the spawn goroutines and ends periodic
|
|
// or trigger routines.
|
|
func (g *Group) Cancel() {
|
|
g.cancel()
|
|
g.finish()
|
|
g.resetCtx()
|
|
}
|
|
|
|
// Finish will ends all periodic or polls routines. It will let
|
|
// currently running functions to finish (cancel is not sent).
|
|
//
|
|
// It is not safe to call Wait concurrently with any other method on g.
|
|
func (g *Group) Finish() {
|
|
g.finish()
|
|
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
|
|
}
|
|
|
|
// CancelAndWait cancels the context passed to any of the spawned goroutines and waits for all spawned
|
|
// goroutines to exit.
|
|
//
|
|
// It is not safe to call Wait concurrently with any other method on g.
|
|
func (g *Group) CancelAndWait() {
|
|
g.finish()
|
|
g.cancel()
|
|
g.wg.Wait()
|
|
g.resetCtx()
|
|
}
|
|
|
|
// WaitToFinish will ends all periodic or polls routines. It will wait for
|
|
// currently running functions to finish (cancel is not sent).
|
|
//
|
|
// It is not safe to call Wait concurrently with any other method on g.
|
|
func (g *Group) WaitToFinish() {
|
|
g.finish()
|
|
g.wg.Wait()
|
|
g.jobCtx, g.finish = context.WithCancel(g.baseCtx)
|
|
}
|
|
|
|
func (g *Group) handlePanic() {
|
|
if g.panicHandler != nil {
|
|
g.panicHandler.HandlePanic()
|
|
}
|
|
}
|