Other: Fix IMAP/SMTP/Login leaks/race conditions
Depending on the timing of bridge closure, it was possible for the IMAP/SMTP servers to not have started serving yet. By grouping this in a cancelable goroutine group (*xsync.Group), we mitigate this issue. Further, depending on internet disconnection timing during user login, it was possible for a user to be improperly logged in. This change fixes this and adds test coverage for it. Lastly, depending on timing, certain background tasks (updates check, connectivity ping) could be improperly started or closed. This change groups them in the *xsync.Group as well to be closed properly.
This commit is contained in:
@ -24,18 +24,22 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/semver/v3"
|
||||
"github.com/ProtonMail/gluon"
|
||||
imapEvents "github.com/ProtonMail/gluon/events"
|
||||
"github.com/ProtonMail/gluon/watcher"
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/async"
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/constants"
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/events"
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/focus"
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/safe"
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/try"
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/user"
|
||||
"github.com/ProtonMail/proton-bridge/v2/internal/vault"
|
||||
"github.com/bradenaw/juniper/xslices"
|
||||
"github.com/bradenaw/juniper/xsync"
|
||||
"github.com/emersion/go-smtp"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/sirupsen/logrus"
|
||||
@ -48,8 +52,7 @@ type Bridge struct {
|
||||
|
||||
// users holds authorized users.
|
||||
users *safe.Map[string, *user.User]
|
||||
loadCh chan struct{}
|
||||
loadWG try.Group
|
||||
goLoad func()
|
||||
|
||||
// api manages user API clients.
|
||||
api *liteapi.Manager
|
||||
@ -62,14 +65,16 @@ type Bridge struct {
|
||||
// imapServer is the bridge's IMAP server.
|
||||
imapServer *gluon.Server
|
||||
imapListener net.Listener
|
||||
imapEventCh chan imapEvents.Event
|
||||
|
||||
// smtpServer is the bridge's SMTP server.
|
||||
smtpServer *smtp.Server
|
||||
smtpServer *smtp.Server
|
||||
smtpListener net.Listener
|
||||
|
||||
// updater is the bridge's updater.
|
||||
updater Updater
|
||||
curVersion *semver.Version
|
||||
updateCheckCh chan struct{}
|
||||
updater Updater
|
||||
goUpdate func()
|
||||
curVersion *semver.Version
|
||||
|
||||
// focusService is used to raise the bridge window when needed.
|
||||
focusService *focus.Service
|
||||
@ -81,7 +86,8 @@ type Bridge struct {
|
||||
locator Locator
|
||||
|
||||
// watchers holds all registered event watchers.
|
||||
watchers *safe.Slice[*watcher.Watcher[events.Event]]
|
||||
watchers []*watcher.Watcher[events.Event]
|
||||
watchersLock sync.RWMutex
|
||||
|
||||
// errors contains errors encountered during startup.
|
||||
errors []error
|
||||
@ -91,10 +97,8 @@ type Bridge struct {
|
||||
logIMAPServer bool
|
||||
logSMTP bool
|
||||
|
||||
// stopCh is used to stop ongoing goroutines when the bridge is closed.
|
||||
stopCh chan struct{}
|
||||
|
||||
closeEventChFn func()
|
||||
// tasks manages the bridge's goroutines.
|
||||
tasks *xsync.Group
|
||||
}
|
||||
|
||||
// New creates a new bridge.
|
||||
@ -115,6 +119,7 @@ func New( //nolint:funlen
|
||||
logIMAPClient, logIMAPServer bool, // whether to log IMAP client/server activity
|
||||
logSMTP bool, // whether to log SMTP activity
|
||||
) (*Bridge, <-chan events.Event, error) {
|
||||
// api is the user's API manager.
|
||||
api := liteapi.New(
|
||||
liteapi.WithHostURL(apiURL),
|
||||
liteapi.WithAppVersion(constants.AppVersion(curVersion.Original())),
|
||||
@ -122,63 +127,63 @@ func New( //nolint:funlen
|
||||
liteapi.WithTransport(roundTripper),
|
||||
)
|
||||
|
||||
tlsConfig, err := loadTLSConfig(vault)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to load TLS config: %w", err)
|
||||
}
|
||||
// tasks holds all the bridge's background tasks.
|
||||
tasks := xsync.NewGroup(context.Background())
|
||||
|
||||
gluonDir, err := getGluonDir(vault)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to get Gluon directory: %w", err)
|
||||
}
|
||||
// imapEventCh forwards IMAP events from gluon instances to the bridge for processing.
|
||||
imapEventCh := make(chan imapEvents.Event)
|
||||
|
||||
imapServer, err := newIMAPServer(gluonDir, curVersion, tlsConfig, logIMAPClient, logIMAPServer)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to create IMAP server: %w", err)
|
||||
}
|
||||
// users holds all the bridge's users.
|
||||
users := safe.NewMap[string, *user.User](nil)
|
||||
|
||||
focusService, err := focus.NewService(curVersion)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to create focus service: %w", err)
|
||||
}
|
||||
// bridge is the bridge.
|
||||
bridge, err := newBridge(
|
||||
users,
|
||||
tasks,
|
||||
imapEventCh,
|
||||
|
||||
bridge := newBridge(
|
||||
// App stuff
|
||||
locator,
|
||||
vault,
|
||||
autostarter,
|
||||
updater,
|
||||
curVersion,
|
||||
|
||||
// API stuff
|
||||
api,
|
||||
identifier,
|
||||
proxyCtl,
|
||||
|
||||
// Service stuff
|
||||
tlsConfig,
|
||||
imapServer,
|
||||
focusService,
|
||||
|
||||
// Logging stuff
|
||||
logIMAPClient,
|
||||
logIMAPServer,
|
||||
logSMTP,
|
||||
logIMAPClient, logIMAPServer, logSMTP,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to create bridge: %w", err)
|
||||
}
|
||||
|
||||
// Get an event channel for all events (individual events can be subscribed to later).
|
||||
eventCh, closeFn := bridge.GetEvents()
|
||||
|
||||
bridge.closeEventChFn = closeFn
|
||||
eventCh, _ := bridge.GetEvents()
|
||||
|
||||
// Initialize all of bridge's background tasks and operations.
|
||||
if err := bridge.init(tlsReporter); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize bridge: %w", err)
|
||||
}
|
||||
|
||||
// Start serving IMAP.
|
||||
if err := bridge.serveIMAP(); err != nil {
|
||||
bridge.PushError(ErrServeIMAP)
|
||||
}
|
||||
|
||||
// Start serving SMTP.
|
||||
if err := bridge.serveSMTP(); err != nil {
|
||||
bridge.PushError(ErrServeSMTP)
|
||||
}
|
||||
|
||||
return bridge, eventCh, nil
|
||||
}
|
||||
|
||||
// nolint:funlen
|
||||
func newBridge(
|
||||
users *safe.Map[string, *user.User],
|
||||
tasks *xsync.Group,
|
||||
imapEventCh chan imapEvents.Event,
|
||||
|
||||
locator Locator,
|
||||
vault *vault.Vault,
|
||||
autostarter Autostarter,
|
||||
@ -189,113 +194,146 @@ func newBridge(
|
||||
identifier Identifier,
|
||||
proxyCtl ProxyController,
|
||||
|
||||
tlsConfig *tls.Config,
|
||||
imapServer *gluon.Server,
|
||||
focusService *focus.Service,
|
||||
logIMAPClient, logIMAPServer, logSMTP bool,
|
||||
) *Bridge {
|
||||
bridge := &Bridge{
|
||||
vault: vault,
|
||||
) (*Bridge, error) {
|
||||
tlsConfig, err := loadTLSConfig(vault)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load TLS config: %w", err)
|
||||
}
|
||||
|
||||
users: safe.NewMap[string, *user.User](nil),
|
||||
loadCh: make(chan struct{}, 1),
|
||||
gluonDir, err := getGluonDir(vault)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get Gluon directory: %w", err)
|
||||
}
|
||||
|
||||
imapServer, err := newIMAPServer(
|
||||
gluonDir,
|
||||
curVersion,
|
||||
tlsConfig,
|
||||
logIMAPClient,
|
||||
logIMAPServer,
|
||||
imapEventCh,
|
||||
tasks,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create IMAP server: %w", err)
|
||||
}
|
||||
|
||||
focusService, err := focus.NewService(curVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create focus service: %w", err)
|
||||
}
|
||||
|
||||
return &Bridge{
|
||||
vault: vault,
|
||||
users: users,
|
||||
|
||||
api: api,
|
||||
proxyCtl: proxyCtl,
|
||||
identifier: identifier,
|
||||
|
||||
tlsConfig: tlsConfig,
|
||||
imapServer: imapServer,
|
||||
tlsConfig: tlsConfig,
|
||||
imapServer: imapServer,
|
||||
imapEventCh: imapEventCh,
|
||||
smtpServer: newSMTPServer(users, tlsConfig, logSMTP),
|
||||
|
||||
updater: updater,
|
||||
curVersion: curVersion,
|
||||
updateCheckCh: make(chan struct{}, 1),
|
||||
updater: updater,
|
||||
curVersion: curVersion,
|
||||
|
||||
focusService: focusService,
|
||||
autostarter: autostarter,
|
||||
locator: locator,
|
||||
|
||||
watchers: safe.NewSlice[*watcher.Watcher[events.Event]](),
|
||||
|
||||
logIMAPClient: logIMAPClient,
|
||||
logIMAPServer: logIMAPServer,
|
||||
logSMTP: logSMTP,
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
bridge.smtpServer = newSMTPServer(&smtpBackend{bridge}, tlsConfig, logSMTP)
|
||||
|
||||
return bridge
|
||||
tasks: tasks,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// nolint:funlen
|
||||
func (bridge *Bridge) init(tlsReporter TLSReporter) error {
|
||||
// Enable or disable the proxy at startup.
|
||||
if bridge.vault.GetProxyAllowed() {
|
||||
bridge.proxyCtl.AllowProxy()
|
||||
} else {
|
||||
bridge.proxyCtl.DisallowProxy()
|
||||
}
|
||||
|
||||
// Handle connection up/down events.
|
||||
bridge.api.AddStatusObserver(func(status liteapi.Status) {
|
||||
switch {
|
||||
case status == liteapi.StatusUp:
|
||||
go bridge.onStatusUp()
|
||||
bridge.onStatusUp()
|
||||
|
||||
case status == liteapi.StatusDown:
|
||||
go bridge.onStatusDown()
|
||||
bridge.onStatusDown()
|
||||
}
|
||||
})
|
||||
|
||||
// If any call returns a bad version code, we need to update.
|
||||
bridge.api.AddErrorHandler(liteapi.AppVersionBadCode, func() {
|
||||
bridge.publish(events.UpdateForced{})
|
||||
})
|
||||
|
||||
// Ensure all outgoing headers have the correct user agent.
|
||||
bridge.api.AddPreRequestHook(func(_ *resty.Client, req *resty.Request) error {
|
||||
req.SetHeader("User-Agent", bridge.identifier.GetUserAgent())
|
||||
return nil
|
||||
})
|
||||
|
||||
go func() {
|
||||
for range tlsReporter.GetTLSIssueCh() {
|
||||
// Publish a TLS issue event if a TLS issue is encountered.
|
||||
bridge.tasks.Once(func(ctx context.Context) {
|
||||
async.RangeContext(ctx, tlsReporter.GetTLSIssueCh(), func(struct{}) {
|
||||
bridge.publish(events.TLSIssue{})
|
||||
}
|
||||
}()
|
||||
})
|
||||
})
|
||||
|
||||
go func() {
|
||||
for range bridge.focusService.GetRaiseCh() {
|
||||
// Publish a raise event if the focus service is called.
|
||||
bridge.tasks.Once(func(ctx context.Context) {
|
||||
async.RangeContext(ctx, bridge.focusService.GetRaiseCh(), func(struct{}) {
|
||||
bridge.publish(events.Raise{})
|
||||
}
|
||||
}()
|
||||
})
|
||||
})
|
||||
|
||||
go func() {
|
||||
for event := range bridge.imapServer.AddWatcher() {
|
||||
// Handle any IMAP events that are forwarded to the bridge from gluon.
|
||||
bridge.tasks.Once(func(ctx context.Context) {
|
||||
async.RangeContext(ctx, bridge.imapEventCh, func(event imapEvents.Event) {
|
||||
bridge.handleIMAPEvent(event)
|
||||
})
|
||||
})
|
||||
|
||||
// Attempt to lazy load users when triggered.
|
||||
bridge.goLoad = bridge.tasks.Trigger(func(ctx context.Context) {
|
||||
if err := bridge.loadUsers(ctx); err != nil {
|
||||
logrus.WithError(err).Error("Failed to load users")
|
||||
} else {
|
||||
bridge.publish(events.AllUsersLoaded{})
|
||||
}
|
||||
}()
|
||||
})
|
||||
defer bridge.goLoad()
|
||||
|
||||
if err := bridge.serveIMAP(); err != nil {
|
||||
bridge.PushError(ErrServeIMAP)
|
||||
}
|
||||
|
||||
if err := bridge.serveSMTP(); err != nil {
|
||||
bridge.PushError(ErrServeSMTP)
|
||||
}
|
||||
|
||||
if err := bridge.watchForUpdates(); err != nil {
|
||||
bridge.PushError(ErrWatchUpdates)
|
||||
}
|
||||
|
||||
go bridge.loadLoop()
|
||||
// Check for updates when triggered.
|
||||
bridge.goUpdate = bridge.tasks.PeriodicOrTrigger(constants.UpdateCheckInterval, 0, func(ctx context.Context) {
|
||||
version, err := bridge.updater.GetVersionInfo(bridge.api, bridge.vault.GetUpdateChannel())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to get version info")
|
||||
} else if err := bridge.handleUpdate(version); err != nil {
|
||||
logrus.WithError(err).Error("Failed to handle update")
|
||||
}
|
||||
})
|
||||
defer bridge.goUpdate()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEvents returns a channel of events of the given type.
|
||||
// If no types are supplied, all events are returned.
|
||||
func (bridge *Bridge) GetEvents(ofType ...events.Event) (<-chan events.Event, func()) {
|
||||
newWatcher := bridge.addWatcher(ofType...)
|
||||
func (bridge *Bridge) GetEvents(ofType ...events.Event) (<-chan events.Event, context.CancelFunc) {
|
||||
watcher := bridge.addWatcher(ofType...)
|
||||
|
||||
return newWatcher.GetChannel(), func() { bridge.remWatcher(newWatcher) }
|
||||
return watcher.GetChannel(), func() { bridge.remWatcher(watcher) }
|
||||
}
|
||||
|
||||
func (bridge *Bridge) PushError(err error) {
|
||||
@ -307,20 +345,6 @@ func (bridge *Bridge) GetErrors() []error {
|
||||
}
|
||||
|
||||
func (bridge *Bridge) Close(ctx context.Context) error {
|
||||
defer func() {
|
||||
if bridge.closeEventChFn != nil {
|
||||
bridge.closeEventChFn()
|
||||
}
|
||||
|
||||
bridge.closeEventChFn = nil
|
||||
}()
|
||||
|
||||
// Stop ongoing operations such as connectivity checks.
|
||||
close(bridge.stopCh)
|
||||
|
||||
// Wait for ongoing user load operations to finish.
|
||||
bridge.loadWG.Wait()
|
||||
|
||||
// Close the IMAP server.
|
||||
if err := bridge.closeIMAP(ctx); err != nil {
|
||||
logrus.WithError(err).Error("Failed to close IMAP server")
|
||||
@ -336,9 +360,22 @@ func (bridge *Bridge) Close(ctx context.Context) error {
|
||||
user.Close()
|
||||
})
|
||||
|
||||
// Stop all ongoing tasks.
|
||||
bridge.tasks.Wait()
|
||||
|
||||
// Close the focus service.
|
||||
bridge.focusService.Close()
|
||||
|
||||
// Close the watchers.
|
||||
bridge.watchersLock.Lock()
|
||||
defer bridge.watchersLock.Unlock()
|
||||
|
||||
for _, watcher := range bridge.watchers {
|
||||
watcher.Close()
|
||||
}
|
||||
|
||||
bridge.watchers = nil
|
||||
|
||||
// Save the last version of bridge that was run.
|
||||
if err := bridge.vault.SetLastVersion(bridge.curVersion); err != nil {
|
||||
logrus.WithError(err).Error("Failed to save last version")
|
||||
@ -348,35 +385,51 @@ func (bridge *Bridge) Close(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (bridge *Bridge) publish(event events.Event) {
|
||||
bridge.watchers.Iter(func(watcher *watcher.Watcher[events.Event]) {
|
||||
bridge.watchersLock.RLock()
|
||||
defer bridge.watchersLock.RUnlock()
|
||||
|
||||
for _, watcher := range bridge.watchers {
|
||||
if watcher.IsWatching(event) {
|
||||
if ok := watcher.Send(event); !ok {
|
||||
logrus.WithField("event", event).Warn("Failed to send event to watcher")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (bridge *Bridge) addWatcher(ofType ...events.Event) *watcher.Watcher[events.Event] {
|
||||
newWatcher := watcher.New(ofType...)
|
||||
bridge.watchersLock.Lock()
|
||||
defer bridge.watchersLock.Unlock()
|
||||
|
||||
bridge.watchers.Append(newWatcher)
|
||||
watcher := watcher.New(ofType...)
|
||||
|
||||
return newWatcher
|
||||
bridge.watchers = append(bridge.watchers, watcher)
|
||||
|
||||
return watcher
|
||||
}
|
||||
|
||||
func (bridge *Bridge) remWatcher(oldWatcher *watcher.Watcher[events.Event]) {
|
||||
oldWatcher.Close()
|
||||
bridge.watchers.Delete(oldWatcher)
|
||||
func (bridge *Bridge) remWatcher(watcher *watcher.Watcher[events.Event]) {
|
||||
bridge.watchersLock.Lock()
|
||||
defer bridge.watchersLock.Unlock()
|
||||
|
||||
idx := xslices.Index(bridge.watchers, watcher)
|
||||
|
||||
if idx < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
bridge.watchers = append(bridge.watchers[:idx], bridge.watchers[idx+1:]...)
|
||||
|
||||
watcher.Close()
|
||||
}
|
||||
|
||||
func (bridge *Bridge) onStatusUp() {
|
||||
bridge.publish(events.ConnStatusUp{})
|
||||
|
||||
bridge.loadCh <- struct{}{}
|
||||
bridge.goLoad()
|
||||
|
||||
bridge.users.IterValues(func(user *user.User) {
|
||||
user.OnStatusUp()
|
||||
go user.OnStatusUp()
|
||||
})
|
||||
}
|
||||
|
||||
@ -384,35 +437,30 @@ func (bridge *Bridge) onStatusDown() {
|
||||
bridge.publish(events.ConnStatusDown{})
|
||||
|
||||
bridge.users.IterValues(func(user *user.User) {
|
||||
user.OnStatusDown()
|
||||
go user.OnStatusDown()
|
||||
})
|
||||
|
||||
upCh, done := bridge.GetEvents(events.ConnStatusUp{})
|
||||
defer done()
|
||||
bridge.tasks.Once(func(ctx context.Context) {
|
||||
backoff := time.Second
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
backoff := time.Second
|
||||
case <-time.After(backoff):
|
||||
if err := bridge.api.Ping(ctx); err != nil {
|
||||
logrus.WithError(err).Debug("Failed to ping API, will retry")
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-upCh:
|
||||
return
|
||||
|
||||
case <-bridge.stopCh:
|
||||
return
|
||||
|
||||
case <-time.After(backoff):
|
||||
if err := bridge.api.Ping(ctx); err != nil {
|
||||
logrus.WithError(err).Debug("Failed to ping API")
|
||||
if backoff < 30*time.Second {
|
||||
backoff *= 2
|
||||
}
|
||||
}
|
||||
|
||||
if backoff < 30*time.Second {
|
||||
backoff *= 2
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func loadTLSConfig(vault *vault.Vault) (*tls.Config, error) {
|
||||
|
||||
Reference in New Issue
Block a user