mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-10 04:36:43 +00:00
fix(GODT-3033): Unable to receive new mail
If the IMAP service happened to finish syncing and wanted to reset the user event service at a time the latter was publishing an event a deadlock would occur and the user would not receive any new messages. This change puts the request to revert the event id in a separate go-routine to avoid this situation from re-occurring. The operational flow remains unchanged as the event service will only process this request once the current set of events have been published.
This commit is contained in:
@ -22,6 +22,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ProtonMail/gluon/async"
|
"github.com/ProtonMail/gluon/async"
|
||||||
@ -94,7 +95,7 @@ type Service struct {
|
|||||||
|
|
||||||
syncConfigPath string
|
syncConfigPath string
|
||||||
lastHandledEventID string
|
lastHandledEventID string
|
||||||
isSyncing bool
|
isSyncing atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(
|
func NewService(
|
||||||
@ -389,23 +390,28 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start a goroutine to wait on event reset as it is possible that the sync received message
|
||||||
|
// was processed during an event publish. This in turn will block the imap service, since the
|
||||||
|
// event service is unable to reply to the request until the events have been processed.
|
||||||
s.log.Info("Sync complete, starting API event stream")
|
s.log.Info("Sync complete, starting API event stream")
|
||||||
if err := s.eventProvider.RewindEventID(ctx, s.lastHandledEventID); err != nil {
|
go func() {
|
||||||
if errors.Is(err, context.Canceled) {
|
if err := s.eventProvider.RewindEventID(ctx, s.lastHandledEventID); err != nil {
|
||||||
continue
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.log.WithError(err).Error("Failed to rewind event service")
|
||||||
|
s.eventPublisher.PublishEvent(ctx, events.UserBadEvent{
|
||||||
|
UserID: s.identityState.UserID(),
|
||||||
|
OldEventID: "",
|
||||||
|
NewEventID: "",
|
||||||
|
EventInfo: "",
|
||||||
|
Error: fmt.Errorf("failed to rewind event loop: %w", err),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
s.log.WithError(err).Error("Failed to rewind event service")
|
s.isSyncing.Store(false)
|
||||||
s.eventPublisher.PublishEvent(ctx, events.UserBadEvent{
|
}()
|
||||||
UserID: s.identityState.UserID(),
|
|
||||||
OldEventID: "",
|
|
||||||
NewEventID: "",
|
|
||||||
EventInfo: "",
|
|
||||||
Error: fmt.Errorf("failed to rewind event loop: %w", err),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
s.isSyncing = false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case request, ok := <-s.syncUpdateApplier.requestCh:
|
case request, ok := <-s.syncUpdateApplier.requestCh:
|
||||||
@ -427,7 +433,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
e.Consume(func(event proton.Event) error {
|
e.Consume(func(event proton.Event) error {
|
||||||
if s.isSyncing {
|
if s.isSyncing.Load() {
|
||||||
if err := syncEventHandler.OnEvent(ctx, event); err != nil {
|
if err := syncEventHandler.OnEvent(ctx, event); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -610,13 +616,13 @@ func (s *Service) setShowAllMail(v bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) startSyncing() {
|
func (s *Service) startSyncing() {
|
||||||
s.isSyncing = true
|
s.isSyncing.Store(true)
|
||||||
s.syncHandler.Execute(s.syncReporter, s.labels.GetLabelMap(), s.syncUpdateApplier, s.syncMessageBuilder, syncservice.DefaultRetryCoolDown)
|
s.syncHandler.Execute(s.syncReporter, s.labels.GetLabelMap(), s.syncUpdateApplier, s.syncMessageBuilder, syncservice.DefaultRetryCoolDown)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) cancelSync() {
|
func (s *Service) cancelSync() {
|
||||||
s.syncHandler.CancelAndWait()
|
s.syncHandler.CancelAndWait()
|
||||||
s.isSyncing = false
|
s.isSyncing.Store(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
type resyncReq struct{}
|
type resyncReq struct{}
|
||||||
|
|||||||
Reference in New Issue
Block a user