forked from Silverfish/proton-bridge
fix(GODT-2626): Server Events should not be merged.
d18e5932b28f83b201709a04fb7b8c6f74003574 Includes GPA bump: https://github.com/ProtonMail/go-proton-api/pull/80
This commit is contained in:
@ -690,87 +690,89 @@ func (user *User) doEventPoll(ctx context.Context) error {
|
||||
user.eventLock.Lock()
|
||||
defer user.eventLock.Unlock()
|
||||
|
||||
event, more, err := user.client.GetEvent(ctx, user.vault.EventID())
|
||||
gpaEvents, more, err := user.client.GetEvent(ctx, user.vault.EventID())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get event (caused by %T): %w", internal.ErrCause(err), err)
|
||||
}
|
||||
|
||||
// If the event ID hasn't changed, there are no new events.
|
||||
if event.EventID == user.vault.EventID() {
|
||||
if gpaEvents[len(gpaEvents)-1].EventID == user.vault.EventID() {
|
||||
user.log.Debug("No new API events")
|
||||
return nil
|
||||
}
|
||||
|
||||
user.log.WithFields(logrus.Fields{
|
||||
"old": user.vault.EventID(),
|
||||
"new": event,
|
||||
}).Info("Received new API event")
|
||||
for _, event := range gpaEvents {
|
||||
user.log.WithFields(logrus.Fields{
|
||||
"old": user.vault.EventID(),
|
||||
"new": event,
|
||||
}).Info("Received new API event")
|
||||
|
||||
// Handle the event.
|
||||
if err := user.handleAPIEvent(ctx, event); err != nil {
|
||||
// If the error is a context cancellation, return error to retry later.
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return fmt.Errorf("failed to handle event due to context cancellation: %w", err)
|
||||
// Handle the event.
|
||||
if err := user.handleAPIEvent(ctx, event); err != nil {
|
||||
// If the error is a context cancellation, return error to retry later.
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return fmt.Errorf("failed to handle event due to context cancellation: %w", err)
|
||||
}
|
||||
|
||||
// If the error is a network error, return error to retry later.
|
||||
if netErr := new(proton.NetError); errors.As(err, &netErr) {
|
||||
return fmt.Errorf("failed to handle event due to network issue: %w", err)
|
||||
}
|
||||
|
||||
// Catch all for uncategorized net errors that may slip through.
|
||||
if netErr := new(net.OpError); errors.As(err, &netErr) {
|
||||
return fmt.Errorf("failed to handle event due to network issues (uncategorized): %w", err)
|
||||
}
|
||||
|
||||
// In case a json decode error slips through.
|
||||
if jsonErr := new(json.UnmarshalTypeError); errors.As(err, &jsonErr) {
|
||||
user.eventCh.Enqueue(events.UncategorizedEventError{
|
||||
UserID: user.ID(),
|
||||
Error: err,
|
||||
})
|
||||
|
||||
return fmt.Errorf("failed to handle event due to JSON issue: %w", err)
|
||||
}
|
||||
|
||||
// If the error is an unexpected EOF, return error to retry later.
|
||||
if errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
return fmt.Errorf("failed to handle event due to EOF: %w", err)
|
||||
}
|
||||
|
||||
// If the error is a server-side issue, return error to retry later.
|
||||
if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status >= 500 {
|
||||
return fmt.Errorf("failed to handle event due to server error: %w", err)
|
||||
}
|
||||
|
||||
// Otherwise, the error is a client-side issue; notify bridge to handle it.
|
||||
user.log.WithField("event", event).Warn("Failed to handle API event")
|
||||
|
||||
user.eventCh.Enqueue(events.UserBadEvent{
|
||||
UserID: user.ID(),
|
||||
OldEventID: user.vault.EventID(),
|
||||
NewEventID: event.EventID,
|
||||
EventInfo: event.String(),
|
||||
Error: err,
|
||||
})
|
||||
|
||||
return fmt.Errorf("failed to handle event due to client error: %w", err)
|
||||
}
|
||||
|
||||
// If the error is a network error, return error to retry later.
|
||||
if netErr := new(proton.NetError); errors.As(err, &netErr) {
|
||||
return fmt.Errorf("failed to handle event due to network issue: %w", err)
|
||||
}
|
||||
user.log.WithField("event", event).Debug("Handled API event")
|
||||
|
||||
// Catch all for uncategorized net errors that may slip through.
|
||||
if netErr := new(net.OpError); errors.As(err, &netErr) {
|
||||
return fmt.Errorf("failed to handle event due to network issues (uncategorized): %w", err)
|
||||
}
|
||||
|
||||
// In case a json decode error slips through.
|
||||
if jsonErr := new(json.UnmarshalTypeError); errors.As(err, &jsonErr) {
|
||||
user.eventCh.Enqueue(events.UncategorizedEventError{
|
||||
// Update the event ID in the vault. If this fails, notify bridge to handle it.
|
||||
if err := user.vault.SetEventID(event.EventID); err != nil {
|
||||
user.eventCh.Enqueue(events.UserBadEvent{
|
||||
UserID: user.ID(),
|
||||
Error: err,
|
||||
})
|
||||
|
||||
return fmt.Errorf("failed to handle event due to JSON issue: %w", err)
|
||||
return fmt.Errorf("failed to update event ID: %w", err)
|
||||
}
|
||||
|
||||
// If the error is an unexpected EOF, return error to retry later.
|
||||
if errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
return fmt.Errorf("failed to handle event due to EOF: %w", err)
|
||||
}
|
||||
|
||||
// If the error is a server-side issue, return error to retry later.
|
||||
if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status >= 500 {
|
||||
return fmt.Errorf("failed to handle event due to server error: %w", err)
|
||||
}
|
||||
|
||||
// Otherwise, the error is a client-side issue; notify bridge to handle it.
|
||||
user.log.WithField("event", event).Warn("Failed to handle API event")
|
||||
|
||||
user.eventCh.Enqueue(events.UserBadEvent{
|
||||
UserID: user.ID(),
|
||||
OldEventID: user.vault.EventID(),
|
||||
NewEventID: event.EventID,
|
||||
EventInfo: event.String(),
|
||||
Error: err,
|
||||
})
|
||||
|
||||
return fmt.Errorf("failed to handle event due to client error: %w", err)
|
||||
user.log.WithField("eventID", event.EventID).Debug("Updated event ID in vault")
|
||||
}
|
||||
|
||||
user.log.WithField("event", event).Debug("Handled API event")
|
||||
|
||||
// Update the event ID in the vault. If this fails, notify bridge to handle it.
|
||||
if err := user.vault.SetEventID(event.EventID); err != nil {
|
||||
user.eventCh.Enqueue(events.UserBadEvent{
|
||||
UserID: user.ID(),
|
||||
Error: err,
|
||||
})
|
||||
|
||||
return fmt.Errorf("failed to update event ID: %w", err)
|
||||
}
|
||||
|
||||
user.log.WithField("eventID", event.EventID).Debug("Updated event ID in vault")
|
||||
|
||||
if more {
|
||||
user.goPollAPIEvents(false)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user