fix(GODT-2812): Fix rare sync deadlock

Copy data rather than hold onto the locks while sync is ongoing. The
data in question does not change while the sync is ongoing and holding
on to the locks during a very long sync can create a deadlock with
due to some IMAP operation that needs to acquire one of those locks with
write access.
This commit is contained in:
Leander Beernaert
2023-07-26 18:12:06 +02:00
committed by Jakub
parent f1cf4ee194
commit f82965b825

View File

@ -113,77 +113,89 @@ func (user *User) doSync(ctx context.Context) error {
}
func (user *User) sync(ctx context.Context) error {
return safe.RLockRet(func() error {
return withAddrKRs(user.apiUser, user.apiAddrs, user.vault.KeyPass(), func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error {
if !user.vault.SyncStatus().HasLabels {
user.log.Info("Syncing labels")
var apiUser proton.User
var apiAddrs map[string]proton.Address
var apiLabels map[string]proton.Label
if err := syncLabels(ctx, user.apiLabels, xslices.Unique(maps.Values(user.updateCh))...); err != nil {
return fmt.Errorf("failed to sync labels: %w", err)
}
safe.RLock(func() {
// Make a copy of the user, labels and addresses. They don't change during sync and we don't need to
// keep holding on to these locks which are required for imap commands to succeed while this is going on.
apiUser = user.apiUser
apiAddrs = maps.Clone(user.apiAddrs)
apiLabels = maps.Clone(user.apiLabels)
}, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock)
if err := user.vault.SetHasLabels(true); err != nil {
return fmt.Errorf("failed to set has labels: %w", err)
}
// We can keep holding on to the updateCh as this is not used during regular imap processing, only with events.
user.updateChLock.RLock()
defer user.updateChLock.RUnlock()
user.log.Info("Synced labels")
} else {
user.log.Info("Labels are already synced, skipping")
return withAddrKRs(apiUser, apiAddrs, user.vault.KeyPass(), func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error {
if !user.vault.SyncStatus().HasLabels {
user.log.Info("Syncing labels")
if err := syncLabels(ctx, apiLabels, xslices.Unique(maps.Values(user.updateCh))...); err != nil {
return fmt.Errorf("failed to sync labels: %w", err)
}
if !user.vault.SyncStatus().HasMessages {
user.log.Info("Syncing messages")
// Determine which messages to sync.
messageIDs, err := user.client.GetMessageIDs(ctx, "")
if err != nil {
return fmt.Errorf("failed to get message IDs to sync: %w", err)
}
logrus.Debugf("User has the following failed synced message ids: %v", user.vault.SyncStatus().FailedMessageIDs)
// Remove any messages that have already failed to sync.
messageIDs = xslices.Filter(messageIDs, func(messageID string) bool {
return !slices.Contains(user.vault.SyncStatus().FailedMessageIDs, messageID)
})
// Reverse the order of the message IDs so that the newest messages are synced first.
xslices.Reverse(messageIDs)
// If we have a message ID that we've already synced, then we can skip all messages before it.
if idx := xslices.Index(messageIDs, user.vault.SyncStatus().LastMessageID); idx >= 0 {
messageIDs = messageIDs[idx+1:]
}
// Sync the messages.
if err := user.syncMessages(
ctx,
user.ID(),
messageIDs,
user.client,
user.reporter,
user.vault,
user.apiLabels,
addrKRs,
user.updateCh,
user.eventCh,
user.maxSyncMemory,
); err != nil {
return fmt.Errorf("failed to sync messages: %w", err)
}
if err := user.vault.SetHasMessages(true); err != nil {
return fmt.Errorf("failed to set has messages: %w", err)
}
user.log.Info("Synced messages")
} else {
user.log.Info("Messages are already synced, skipping")
if err := user.vault.SetHasLabels(true); err != nil {
return fmt.Errorf("failed to set has labels: %w", err)
}
return nil
})
}, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock)
user.log.Info("Synced labels")
} else {
user.log.Info("Labels are already synced, skipping")
}
if !user.vault.SyncStatus().HasMessages {
user.log.Info("Syncing messages")
// Determine which messages to sync.
messageIDs, err := user.client.GetMessageIDs(ctx, "")
if err != nil {
return fmt.Errorf("failed to get message IDs to sync: %w", err)
}
// Remove any messages that have already failed to sync.
messageIDs = xslices.Filter(messageIDs, func(messageID string) bool {
return !slices.Contains(user.vault.SyncStatus().FailedMessageIDs, messageID)
})
// Reverse the order of the message IDs so that the newest messages are synced first.
xslices.Reverse(messageIDs)
// If we have a message ID that we've already synced, then we can skip all messages before it.
if idx := xslices.Index(messageIDs, user.vault.SyncStatus().LastMessageID); idx >= 0 {
messageIDs = messageIDs[idx+1:]
}
// Sync the messages.
if err := user.syncMessages(
ctx,
user.ID(),
messageIDs,
user.client,
user.reporter,
user.vault,
apiLabels,
addrKRs,
user.updateCh,
user.eventCh,
user.maxSyncMemory,
); err != nil {
return fmt.Errorf("failed to sync messages: %w", err)
}
if err := user.vault.SetHasMessages(true); err != nil {
return fmt.Errorf("failed to set has messages: %w", err)
}
user.log.Info("Synced messages")
} else {
user.log.Info("Messages are already synced, skipping")
}
return nil
})
}
// nolint:exhaustive