fix(GODT-2327): Better sleep (with context)
This commit is contained in:
@ -376,8 +376,6 @@ func (bridge *Bridge) init(tlsReporter TLSReporter) error {
|
|||||||
|
|
||||||
// Attempt to lazy load users when triggered.
|
// Attempt to lazy load users when triggered.
|
||||||
bridge.goLoad = bridge.tasks.Trigger(func(ctx context.Context) {
|
bridge.goLoad = bridge.tasks.Trigger(func(ctx context.Context) {
|
||||||
logrus.Info("Loading users")
|
|
||||||
|
|
||||||
if err := bridge.loadUsers(ctx); err != nil {
|
if err := bridge.loadUsers(ctx); err != nil {
|
||||||
logrus.WithError(err).Error("Failed to load users")
|
logrus.WithError(err).Error("Failed to load users")
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -330,17 +330,18 @@ func (bridge *Bridge) loginUser(ctx context.Context, client *proton.Client, auth
|
|||||||
// loadUsers tries to load each user in the vault that isn't already loaded.
|
// loadUsers tries to load each user in the vault that isn't already loaded.
|
||||||
func (bridge *Bridge) loadUsers(ctx context.Context) error {
|
func (bridge *Bridge) loadUsers(ctx context.Context) error {
|
||||||
logrus.WithField("count", len(bridge.vault.GetUserIDs())).Info("Loading users")
|
logrus.WithField("count", len(bridge.vault.GetUserIDs())).Info("Loading users")
|
||||||
|
defer logrus.Info("Finished loading users")
|
||||||
|
|
||||||
return bridge.vault.ForUser(runtime.NumCPU(), func(user *vault.User) error {
|
return bridge.vault.ForUser(runtime.NumCPU(), func(user *vault.User) error {
|
||||||
log := logrus.WithField("userID", user.UserID())
|
log := logrus.WithField("userID", user.UserID())
|
||||||
|
|
||||||
if user.AuthUID() == "" {
|
if user.AuthUID() == "" {
|
||||||
log.Info("Not loading disconnected user")
|
log.Info("User is not connected (skipping)")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if safe.RLockRet(func() bool { return mapHas(bridge.users, user.UserID()) }, bridge.usersLock) {
|
if safe.RLockRet(func() bool { return mapHas(bridge.users, user.UserID()) }, bridge.usersLock) {
|
||||||
log.Debug("Not loading already-loaded user")
|
log.Info("User is already loaded (skipping)")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -185,12 +185,12 @@ func New(
|
|||||||
|
|
||||||
// When triggered, sync the user and then begin streaming API events.
|
// When triggered, sync the user and then begin streaming API events.
|
||||||
user.goSync = user.tasks.Trigger(func(ctx context.Context) {
|
user.goSync = user.tasks.Trigger(func(ctx context.Context) {
|
||||||
user.log.Debug("Sync triggered")
|
user.log.Info("Sync triggered")
|
||||||
|
|
||||||
// Sync the user.
|
// Sync the user.
|
||||||
user.syncAbort.Do(ctx, func(ctx context.Context) {
|
user.syncAbort.Do(ctx, func(ctx context.Context) {
|
||||||
if user.vault.SyncStatus().IsComplete() {
|
if user.vault.SyncStatus().IsComplete() {
|
||||||
user.log.Debug("Sync already complete, skipping")
|
user.log.Info("Sync already complete, skipping")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,7 +200,7 @@ func New(
|
|||||||
return
|
return
|
||||||
} else if err := user.doSync(ctx); err != nil {
|
} else if err := user.doSync(ctx); err != nil {
|
||||||
user.log.WithError(err).Error("Failed to sync, will retry later")
|
user.log.WithError(err).Error("Failed to sync, will retry later")
|
||||||
time.Sleep(SyncRetryCooldown)
|
sleepCtx(ctx, SyncRetryCooldown)
|
||||||
} else {
|
} else {
|
||||||
user.log.Info("Sync complete, starting API event stream")
|
user.log.Info("Sync complete, starting API event stream")
|
||||||
return
|
return
|
||||||
@ -685,3 +685,11 @@ func b32(b bool) uint32 {
|
|||||||
|
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sleepCtx sleeps for the given duration, or until the context is canceled.
|
||||||
|
func sleepCtx(ctx context.Context, d time.Duration) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-time.After(d):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user