forked from Silverfish/proton-bridge
Other: Fix goroutine leaks in sync tests
Add missing Close calls. Properly handle nil channel for `user.startSync`. This patch also updated liteapi and Gluon to latest master and dev version respectively.
This commit is contained in:
committed by
James Houlahan
parent
6fdc8bd379
commit
6bbaf03f1f
@ -93,6 +93,8 @@ type Bridge struct {
|
||||
|
||||
// stopCh is used to stop ongoing goroutines when the bridge is closed.
|
||||
stopCh chan struct{}
|
||||
|
||||
closeEventChFn func()
|
||||
}
|
||||
|
||||
// New creates a new bridge.
|
||||
@ -165,7 +167,9 @@ func New( //nolint:funlen
|
||||
)
|
||||
|
||||
// Get an event channel for all events (individual events can be subscribed to later).
|
||||
eventCh, _ := bridge.GetEvents()
|
||||
eventCh, closeFn := bridge.GetEvents()
|
||||
|
||||
bridge.closeEventChFn = closeFn
|
||||
|
||||
if err := bridge.init(tlsReporter); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize bridge: %w", err)
|
||||
@ -303,6 +307,14 @@ func (bridge *Bridge) GetErrors() []error {
|
||||
}
|
||||
|
||||
func (bridge *Bridge) Close(ctx context.Context) error {
|
||||
defer func() {
|
||||
if bridge.closeEventChFn != nil {
|
||||
bridge.closeEventChFn()
|
||||
}
|
||||
|
||||
bridge.closeEventChFn = nil
|
||||
}()
|
||||
|
||||
// Stop ongoing operations such as connectivity checks.
|
||||
close(bridge.stopCh)
|
||||
|
||||
@ -356,6 +368,7 @@ func (bridge *Bridge) addWatcher(ofType ...events.Event) *watcher.Watcher[events
|
||||
}
|
||||
|
||||
func (bridge *Bridge) remWatcher(oldWatcher *watcher.Watcher[events.Event]) {
|
||||
oldWatcher.Close()
|
||||
bridge.watchers.Delete(oldWatcher)
|
||||
}
|
||||
|
||||
|
||||
@ -25,6 +25,8 @@ import (
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"go.uber.org/goleak"
|
||||
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/bridge"
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/events"
|
||||
"github.com/bradenaw/juniper/iterator"
|
||||
@ -36,6 +38,8 @@ import (
|
||||
)
|
||||
|
||||
func TestBridge_Sync(t *testing.T) {
|
||||
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
|
||||
|
||||
s := server.New()
|
||||
defer s.Close()
|
||||
|
||||
@ -56,6 +60,7 @@ func TestBridge_Sync(t *testing.T) {
|
||||
liteapi.WithTransport(liteapi.InsecureTransport()),
|
||||
).NewClientWithLogin(ctx, "imap", password)
|
||||
require.NoError(t, err)
|
||||
defer c.Close()
|
||||
|
||||
user, err := c.GetUser(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -394,6 +394,9 @@ func (bridge *Bridge) addUserWithVault(
|
||||
return fmt.Errorf("failed to create user: %w", err)
|
||||
}
|
||||
|
||||
if bridge.users.Has(apiUser.ID) {
|
||||
panic("double add")
|
||||
}
|
||||
bridge.users.Set(apiUser.ID, user)
|
||||
|
||||
// Connect the user's address(es) to gluon.
|
||||
|
||||
@ -154,7 +154,7 @@ func (user *User) handleDeleteAddressEvent(_ context.Context, event liteapi.Addr
|
||||
|
||||
if ok := user.updateCh.GetDelete(event.ID, func(updateCh *queue.QueuedChannel[imap.Update]) {
|
||||
if user.vault.AddressMode() == vault.SplitMode {
|
||||
updateCh.Close()
|
||||
updateCh.CloseAndDiscardQueued()
|
||||
}
|
||||
}); !ok {
|
||||
return fmt.Errorf("no such address %q", event.ID)
|
||||
|
||||
@ -94,21 +94,19 @@ func getAddrID(apiAddrs []liteapi.Address, email string) (string, error) {
|
||||
}
|
||||
|
||||
// contextWithStopCh returns a new context that is cancelled when the stop channel is closed or a value is sent to it.
|
||||
func contextWithStopCh(ctx context.Context, stopCh ...<-chan struct{}) (context.Context, context.CancelFunc) {
|
||||
func contextWithStopCh(ctx context.Context, channels ...<-chan struct{}) (context.Context, context.CancelFunc) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
for _, stopCh := range stopCh {
|
||||
stopCh := stopCh
|
||||
|
||||
go func() {
|
||||
for _, stopCh := range channels {
|
||||
go func(ch <-chan struct{}) {
|
||||
select {
|
||||
case <-stopCh:
|
||||
case <-ch:
|
||||
cancel()
|
||||
|
||||
case <-ctx.Done():
|
||||
// ...
|
||||
}
|
||||
}()
|
||||
}(stopCh)
|
||||
}
|
||||
|
||||
return ctx, cancel
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -55,6 +56,7 @@ type User struct {
|
||||
|
||||
syncStopCh chan struct{}
|
||||
syncLock try.Group
|
||||
syncWG sync.WaitGroup
|
||||
|
||||
showAllMail int32
|
||||
}
|
||||
@ -135,10 +137,13 @@ func New(ctx context.Context, encVault *vault.User, client *liteapi.Client, apiU
|
||||
// GODT-1946 - Don't start the event loop until the initial sync has finished.
|
||||
eventCh := user.client.NewEventStream(EventPeriod, EventJitter, user.vault.EventID())
|
||||
|
||||
user.syncWG.Add(1)
|
||||
// If we haven't synced yet, do it first.
|
||||
// If it fails, we don't start the event loop.
|
||||
// Otherwise, begin processing API events, logging any errors that occur.
|
||||
go func() {
|
||||
defer user.syncWG.Done()
|
||||
|
||||
if err := <-user.startSync(); err != nil {
|
||||
return
|
||||
}
|
||||
@ -200,7 +205,7 @@ func (user *User) SetAddressMode(ctx context.Context, mode vault.AddressMode) er
|
||||
|
||||
user.updateCh.Values(func(updateCh []*queue.QueuedChannel[imap.Update]) {
|
||||
for _, updateCh := range xslices.Unique(updateCh) {
|
||||
updateCh.Close()
|
||||
updateCh.CloseAndDiscardQueued()
|
||||
}
|
||||
})
|
||||
|
||||
@ -393,6 +398,8 @@ func (user *User) Logout(ctx context.Context) error {
|
||||
|
||||
// Close closes ongoing connections and cleans up resources.
|
||||
func (user *User) Close() error {
|
||||
defer user.syncWG.Wait()
|
||||
|
||||
// Close any ongoing operations.
|
||||
close(user.stopCh)
|
||||
|
||||
@ -405,12 +412,12 @@ func (user *User) Close() error {
|
||||
// Close the user's update channels.
|
||||
user.updateCh.Values(func(updateCh []*queue.QueuedChannel[imap.Update]) {
|
||||
for _, updateCh := range xslices.Unique(updateCh) {
|
||||
updateCh.Close()
|
||||
updateCh.CloseAndDiscardQueued()
|
||||
}
|
||||
})
|
||||
|
||||
// Close the user's notify channel.
|
||||
user.eventCh.Close()
|
||||
user.eventCh.CloseAndDiscardQueued()
|
||||
|
||||
// Close the user's vault.
|
||||
if err := user.vault.Close(); err != nil {
|
||||
@ -462,16 +469,16 @@ func (user *User) streamEvents(eventCh <-chan liteapi.Event) <-chan error {
|
||||
|
||||
// startSync begins a startSync for the user.
|
||||
func (user *User) startSync() <-chan error {
|
||||
if user.vault.SyncStatus().IsComplete() {
|
||||
logrus.Debug("Already synced, skipping")
|
||||
return nil
|
||||
}
|
||||
|
||||
errCh := make(chan error)
|
||||
|
||||
user.syncLock.GoTry(func(ok bool) {
|
||||
defer close(errCh)
|
||||
|
||||
if user.vault.SyncStatus().IsComplete() {
|
||||
logrus.Debug("Already synced, skipping")
|
||||
return
|
||||
}
|
||||
|
||||
if !ok {
|
||||
logrus.Debug("Sync already in progress, skipping")
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user