forked from Silverfish/proton-bridge
fix(GODT-2626): Handle rare crash due to missing address update ch
Ensure that we can handle the rare case that can cause a crash if for whichever reason we end up with an Address Delete and Message Create/Update in the same event object.
This commit is contained in:
@ -217,7 +217,7 @@ func (user *User) handleCreateAddressEvent(ctx context.Context, event proton.Add
|
|||||||
// If the address is enabled, we need to hook it up to the update channels.
|
// If the address is enabled, we need to hook it up to the update channels.
|
||||||
switch user.vault.AddressMode() {
|
switch user.vault.AddressMode() {
|
||||||
case vault.CombinedMode:
|
case vault.CombinedMode:
|
||||||
primAddr, err := getAddrIdx(user.apiAddrs, 0)
|
primAddr, err := getPrimaryAddr(user.apiAddrs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get primary address: %w", err)
|
return fmt.Errorf("failed to get primary address: %w", err)
|
||||||
}
|
}
|
||||||
@ -276,7 +276,7 @@ func (user *User) handleUpdateAddressEvent(_ context.Context, event proton.Addre
|
|||||||
case oldAddr.Status != proton.AddressStatusEnabled && event.Address.Status == proton.AddressStatusEnabled:
|
case oldAddr.Status != proton.AddressStatusEnabled && event.Address.Status == proton.AddressStatusEnabled:
|
||||||
switch user.vault.AddressMode() {
|
switch user.vault.AddressMode() {
|
||||||
case vault.CombinedMode:
|
case vault.CombinedMode:
|
||||||
primAddr, err := getAddrIdx(user.apiAddrs, 0)
|
primAddr, err := getPrimaryAddr(user.apiAddrs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get primary address: %w", err)
|
return fmt.Errorf("failed to get primary address: %w", err)
|
||||||
}
|
}
|
||||||
@ -628,7 +628,14 @@ func (user *User) handleCreateMessageEvent(ctx context.Context, message proton.M
|
|||||||
}
|
}
|
||||||
|
|
||||||
update = imap.NewMessagesCreated(false, res.update)
|
update = imap.NewMessagesCreated(false, res.update)
|
||||||
user.updateCh[full.AddressID].Enqueue(update)
|
didPublish, err := safePublishMessageUpdate(user, full.AddressID, update)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !didPublish {
|
||||||
|
update = nil
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
@ -674,7 +681,14 @@ func (user *User) handleUpdateMessageEvent(_ context.Context, message proton.Mes
|
|||||||
flags,
|
flags,
|
||||||
)
|
)
|
||||||
|
|
||||||
user.updateCh[message.AddressID].Enqueue(update)
|
didPublish, err := safePublishMessageUpdate(user, message.AddressID, update)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !didPublish {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
return []imap.Update{update}, nil
|
return []imap.Update{update}, nil
|
||||||
}, user.apiLabelsLock, user.updateChLock)
|
}, user.apiLabelsLock, user.updateChLock)
|
||||||
@ -743,13 +757,24 @@ func (user *User) handleUpdateDraftEvent(ctx context.Context, event proton.Messa
|
|||||||
true, // Is the message doesn't exist, silently create it.
|
true, // Is the message doesn't exist, silently create it.
|
||||||
)
|
)
|
||||||
|
|
||||||
user.updateCh[full.AddressID].Enqueue(update)
|
didPublish, err := safePublishMessageUpdate(user, full.AddressID, update)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !didPublish {
|
||||||
|
update = nil
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if update == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
return []imap.Update{update}, nil
|
return []imap.Update{update}, nil
|
||||||
}, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock)
|
}, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock)
|
||||||
}
|
}
|
||||||
@ -816,3 +841,37 @@ func (user *User) reportErrorNoContextCancel(title string, err error, reportCont
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// safePublishMessageUpdate handles the rare case where the address' update channel may have been deleted in the same
|
||||||
|
// event. This rare case can take place if in the same event fetch request there is an update for delete address and
|
||||||
|
// create/update message.
|
||||||
|
// If the user is in combined mode, we simply push the update to the primary address. If the user is in split mode
|
||||||
|
// we do not publish the update as the address no longer exists.
|
||||||
|
func safePublishMessageUpdate(user *User, addressID string, update imap.Update) (bool, error) {
|
||||||
|
v, ok := user.updateCh[addressID]
|
||||||
|
if !ok {
|
||||||
|
if user.GetAddressMode() == vault.CombinedMode {
|
||||||
|
primAddr, err := getPrimaryAddr(user.apiAddrs)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to get primary address: %w", err)
|
||||||
|
}
|
||||||
|
primaryCh, ok := user.updateCh[primAddr.ID]
|
||||||
|
if !ok {
|
||||||
|
return false, fmt.Errorf("primary address channel is not available")
|
||||||
|
}
|
||||||
|
|
||||||
|
primaryCh.Enqueue(update)
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Warnf("Update channel not found for address %v, it may have been already deleted", addressID)
|
||||||
|
_ = user.reporter.ReportMessage("Message Update channel does not exist")
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
v.Enqueue(update)
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|||||||
@ -83,6 +83,18 @@ func getAddrIdx(apiAddrs map[string]proton.Address, idx int) (proton.Address, er
|
|||||||
return sorted[idx], nil
|
return sorted[idx], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getPrimaryAddr(apiAddrs map[string]proton.Address) (proton.Address, error) {
|
||||||
|
sorted := sortSlice(maps.Values(apiAddrs), func(a, b proton.Address) bool {
|
||||||
|
return a.Order < b.Order
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(sorted) == 0 {
|
||||||
|
return proton.Address{}, fmt.Errorf("no addresses available")
|
||||||
|
}
|
||||||
|
|
||||||
|
return sorted[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
// sortSlice returns the given slice sorted by the given comparator.
|
// sortSlice returns the given slice sorted by the given comparator.
|
||||||
func sortSlice[Item any](items []Item, less func(Item, Item) bool) []Item {
|
func sortSlice[Item any](items []Item, less func(Item, Item) bool) []Item {
|
||||||
sorted := make([]Item, len(items))
|
sorted := make([]Item, len(items))
|
||||||
|
|||||||
Reference in New Issue
Block a user