fix(GODT-2327): Delay event processing until gluon user exists

We don't want to start processing events until those events have
somewhere to be sent to.

Also, to be safe, ensure remove and re-add the gluon user while
clearing its sync status. This shouldn't be necessary.

fix(GODT-2327): Only start processing events once sync is finished

fix(GODT-2327): avoid windows delete all deadlock

fix(GODT-2327): Clear update channels whenever clearing sync status

fix(GODT-2327): Properly cancel event stream when handling refresh

fix(GODT-2327): Remove unnecessary sync abort call

fix(GODT-2327): Fix lint issue

fix(GODT-2327): Don't retry with abortable context because it's canceled

fix(GODT-2327): Loop to retry until sync has complete

fix(GODT-2327): Better sleep (with context)
This commit is contained in:
James Houlahan
2023-02-03 15:06:27 +01:00
parent 29072f0285
commit f3c5e300cd
7 changed files with 125 additions and 64 deletions

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.18
require (
github.com/0xAX/notificator v0.0.0-20220220101646-ee9b8921e557
github.com/Masterminds/semver/v3 v3.1.1
github.com/ProtonMail/gluon v0.14.2-0.20230206091703-4a3d7a57eeae
github.com/ProtonMail/gluon v0.14.2-0.20230206162331-cf36d870802b
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a
github.com/ProtonMail/go-proton-api v0.3.1-0.20230203120457-1849bf7d578b
github.com/ProtonMail/go-rfc5322 v0.11.0

4
go.sum
View File

@ -28,8 +28,8 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs
github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo=
github.com/ProtonMail/docker-credential-helpers v1.1.0 h1:+kvUIpwWcbtP3WFv5sSvkFn/XLzSqPOB5AAthuk9xPk=
github.com/ProtonMail/docker-credential-helpers v1.1.0/go.mod h1:mK0aBveCxhnQ756AmaTfXMZDeULvheYVhF/MWMErN5g=
github.com/ProtonMail/gluon v0.14.2-0.20230206091703-4a3d7a57eeae h1:iJ0CgJEZTBRGX+vwmMrIg2HEGo3+qBJD/PPcDvYqzQg=
github.com/ProtonMail/gluon v0.14.2-0.20230206091703-4a3d7a57eeae/go.mod h1:HYHr7hG7LPWI1S50M8NfHRb1kYi5B+Yu4/N/H+y+JUY=
github.com/ProtonMail/gluon v0.14.2-0.20230206162331-cf36d870802b h1:v/XwH5Em8gFSpJQErhSCt0XAsIxojFxgrVcfPUEWH7I=
github.com/ProtonMail/gluon v0.14.2-0.20230206162331-cf36d870802b/go.mod h1:HYHr7hG7LPWI1S50M8NfHRb1kYi5B+Yu4/N/H+y+JUY=
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a h1:D+aZah+k14Gn6kmL7eKxoo/4Dr/lK3ChBcwce2+SQP4=
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a/go.mod h1:oTGdE7/DlWIr23G0IKW3OXK9wZ5Hw1GGiaJFccTvZi4=
github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo=

View File

@ -375,8 +375,6 @@ func (bridge *Bridge) init(tlsReporter TLSReporter) error {
// Attempt to load users from the vault when triggered.
bridge.goLoad = bridge.tasks.Trigger(func(ctx context.Context) {
logrus.Info("Loading users")
if err := bridge.loadUsers(ctx); err != nil {
logrus.WithError(err).Error("Failed to load users")
return

View File

@ -152,9 +152,22 @@ func (bridge *Bridge) addIMAPUser(ctx context.Context, user *user.User) error {
// If the DB was newly created, clear the sync status; gluon's DB was not found.
logrus.Warn("IMAP user DB was newly created, clearing sync status")
// Remove the user from IMAP so we can clear the sync status.
if err := bridge.imapServer.RemoveUser(ctx, gluonID, false); err != nil {
return fmt.Errorf("failed to remove IMAP user: %w", err)
}
// Clear the sync status -- we need to resync all messages.
if err := user.ClearSyncStatus(); err != nil {
return fmt.Errorf("failed to clear sync status: %w", err)
}
// Add the user back to the IMAP server.
if isNew, err := bridge.imapServer.LoadUser(ctx, imapConn, gluonID, user.GluonKey()); err != nil {
return fmt.Errorf("failed to add IMAP user: %w", err)
} else if isNew {
panic("IMAP user should already have a database")
}
} else if status := user.GetSyncStatus(); !status.HasLabels {
// Otherwise, the DB already exists -- if the labels are not yet synced, we need to re-create the DB.
if err := bridge.imapServer.RemoveUser(ctx, gluonID, true); err != nil {
@ -192,7 +205,9 @@ func (bridge *Bridge) addIMAPUser(ctx context.Context, user *user.User) error {
}
}
// Trigger a sync for the user, if needed.
user.TriggerSync()
return nil
}

View File

@ -330,17 +330,18 @@ func (bridge *Bridge) loginUser(ctx context.Context, client *proton.Client, auth
// loadUsers tries to load each user in the vault that isn't already loaded.
func (bridge *Bridge) loadUsers(ctx context.Context) error {
logrus.WithField("count", len(bridge.vault.GetUserIDs())).Info("Loading users")
defer logrus.Info("Finished loading users")
return bridge.vault.ForUser(runtime.NumCPU(), func(user *vault.User) error {
log := logrus.WithField("userID", user.UserID())
if user.AuthUID() == "" {
log.Info("Not loading disconnected user")
log.Info("User is not connected (skipping)")
return nil
}
if safe.RLockRet(func() bool { return mapHas(bridge.users, user.UserID()) }, bridge.usersLock) {
log.Debug("Not loading already-loaded user")
log.Info("User is already loaded (skipping)")
return nil
}

View File

@ -90,8 +90,10 @@ func (user *User) handleRefreshEvent(ctx context.Context, refresh proton.Refresh
l.WithError(err).Error("Failed to report refresh to sentry")
}
// Cancel and restart ongoing syncs.
user.abortable.Abort()
// Cancel the event stream once this refresh is done.
defer user.pollAbort.Abort()
// Resync after the refresh.
defer user.goSync()
return safe.LockRet(func() error {
@ -118,11 +120,8 @@ func (user *User) handleRefreshEvent(ctx context.Context, refresh proton.Refresh
user.apiAddrs = groupBy(apiAddrs, func(addr proton.Address) string { return addr.ID })
user.apiLabels = groupBy(apiLabels, func(label proton.Label) string { return label.ID })
// Reinitialize the update channels.
user.initUpdateCh(user.vault.AddressMode())
// Clear sync status; we want to sync everything again.
if err := user.vault.ClearSyncStatus(); err != nil {
if err := user.clearSyncStatus(); err != nil {
return fmt.Errorf("failed to clear sync status: %w", err)
}

View File

@ -78,7 +78,8 @@ type User struct {
updateChLock safe.RWMutex
tasks *async.Group
abortable async.Abortable
syncAbort async.Abortable
pollAbort async.Abortable
goSync func()
pollAPIEventsCh chan chan struct{}
@ -171,42 +172,6 @@ func New(
return nil
})
// Stream events from the API, logging any errors that occur.
// This does nothing until the sync has been marked as complete.
// When we receive an API event, we attempt to handle it.
// If successful, we update the event ID in the vault.
user.tasks.Once(func(ctx context.Context) {
ticker := proton.NewTicker(EventPeriod, EventJitter)
defer ticker.Stop()
for {
var doneCh chan struct{}
select {
case <-ctx.Done():
return
case doneCh = <-user.pollAPIEventsCh:
// ...
case <-ticker.C:
// ...
}
user.log.Debug("Event poll triggered")
if !user.vault.SyncStatus().IsComplete() {
user.log.Debug("Sync is incomplete, skipping event poll")
} else if err := user.doEventPoll(ctx); err != nil {
user.log.WithError(err).Error("Failed to poll events")
}
if doneCh != nil {
close(doneCh)
}
}
})
// When triggered, poll the API for events, optionally blocking until the poll is complete.
user.goPollAPIEvents = func(wait bool) {
doneCh := make(chan struct{})
@ -218,18 +183,37 @@ func New(
}
}
// When triggered, attempt to sync the user.
// When triggered, sync the user and then begin streaming API events.
user.goSync = user.tasks.Trigger(func(ctx context.Context) {
user.log.Debug("Sync triggered")
user.log.Info("Sync triggered")
user.abortable.Do(ctx, func(ctx context.Context) {
// Sync the user.
user.syncAbort.Do(ctx, func(ctx context.Context) {
if user.vault.SyncStatus().IsComplete() {
user.log.Debug("Sync is already complete, skipping")
user.log.Info("Sync already complete, skipping")
return
}
for {
if err := ctx.Err(); err != nil {
user.log.WithError(err).Error("Sync aborted")
return
} else if err := user.doSync(ctx); err != nil {
user.log.WithError(err).Error("Failed to sync, will retry later")
time.AfterFunc(SyncRetryCooldown, user.goSync)
sleepCtx(ctx, SyncRetryCooldown)
} else {
user.log.Info("Sync complete, starting API event stream")
return
}
}
})
// Once we know the sync has completed, we can start polling for API events.
if user.vault.SyncStatus().IsComplete() {
user.pollAbort.Do(ctx, func(ctx context.Context) {
user.startEvents(ctx)
})
}
})
return user, nil
@ -295,22 +279,21 @@ func (user *User) GetAddressMode() vault.AddressMode {
func (user *User) SetAddressMode(_ context.Context, mode vault.AddressMode) error {
user.log.WithField("mode", mode).Info("Setting address mode")
user.abortable.Abort()
user.syncAbort.Abort()
user.pollAbort.Abort()
defer user.goSync()
return safe.LockRet(func() error {
user.initUpdateCh(mode)
if err := user.vault.SetAddressMode(mode); err != nil {
return fmt.Errorf("failed to set address mode: %w", err)
}
if err := user.vault.ClearSyncStatus(); err != nil {
if err := user.clearSyncStatus(); err != nil {
return fmt.Errorf("failed to clear sync status: %w", err)
}
return nil
}, user.apiAddrsLock, user.updateChLock)
}, user.eventLock, user.apiAddrsLock, user.updateChLock)
}
// SetShowAllMail sets whether to show the All Mail mailbox.
@ -486,7 +469,8 @@ func (user *User) OnStatusUp(context.Context) {
func (user *User) OnStatusDown(context.Context) {
user.log.Info("Connection is down")
user.abortable.Abort()
user.syncAbort.Abort()
user.pollAbort.Abort()
}
// GetSyncStatus returns the sync status of the user.
@ -495,8 +479,30 @@ func (user *User) GetSyncStatus() vault.SyncStatus {
}
// ClearSyncStatus clears the sync status of the user.
// This also drops any updates in the update channel(s).
// Warning: the gluon user must be removed and re-added if this happens!
func (user *User) ClearSyncStatus() error {
return user.vault.ClearSyncStatus()
user.log.Info("Clearing sync status")
return safe.LockRet(func() error {
return user.clearSyncStatus()
}, user.eventLock, user.apiAddrsLock, user.updateChLock)
}
// clearSyncStatus clears the sync status of the user.
// This also drops any updates in the update channel(s).
// Warning: the gluon user must be removed and re-added if this happens!
// It is assumed that the eventLock, apiAddrsLock and updateChLock are already locked.
func (user *User) clearSyncStatus() error {
user.log.Info("Clearing sync status")
user.initUpdateCh(user.vault.AddressMode())
if err := user.vault.ClearSyncStatus(); err != nil {
return fmt.Errorf("failed to clear sync status: %w", err)
}
return nil
}
// Logout logs the user out from the API.
@ -574,6 +580,40 @@ func (user *User) initUpdateCh(mode vault.AddressMode) {
}
}
// startEvents streams events from the API, logging any errors that occur.
// This does nothing until the sync has been marked as complete.
// When we receive an API event, we attempt to handle it.
// If successful, we update the event ID in the vault.
func (user *User) startEvents(ctx context.Context) {
ticker := proton.NewTicker(EventPeriod, EventJitter)
defer ticker.Stop()
for {
var doneCh chan struct{}
select {
case <-ctx.Done():
return
case doneCh = <-user.pollAPIEventsCh:
// ...
case <-ticker.C:
// ...
}
user.log.Debug("Event poll triggered")
if err := user.doEventPoll(ctx); err != nil {
user.log.WithError(err).Error("Failed to poll events")
}
if doneCh != nil {
close(doneCh)
}
}
}
// doEventPoll is called whenever API events should be polled.
func (user *User) doEventPoll(ctx context.Context) error {
user.eventLock.Lock()
@ -643,3 +683,11 @@ func b32(b bool) uint32 {
return 0
}
// sleepCtx sleeps for the given duration, or until the context is canceled.
func sleepCtx(ctx context.Context, d time.Duration) {
select {
case <-ctx.Done():
case <-time.After(d):
}
}