diff --git a/internal/app/base/base.go b/internal/app/base/base.go index 7d2edd58..46232bd9 100644 --- a/internal/app/base/base.go +++ b/internal/app/base/base.go @@ -157,12 +157,15 @@ func New( // nolint[funlen] } apiConfig := pmapi.GetAPIConfig(configName, constants.Version) - apiConfig.NoConnectionHandler = func() { + apiConfig.ConnectionOffHandler = func() { eventListener.Emit(events.InternetOffEvent, "") } - apiConfig.ConnectionHandler = func() { + apiConfig.ConnectionOnHandler = func() { eventListener.Emit(events.InternetOnEvent, "") } + apiConfig.UpgradeApplicationHandler = func() { + eventListener.Emit(events.UpgradeApplicationEvent, "") + } cm := pmapi.NewClientManager(apiConfig) cm.SetRoundTripper(pmapi.GetRoundTripper(cm, listener)) cm.SetCookieJar(jar) diff --git a/internal/app/bridge/bridge.go b/internal/app/bridge/bridge.go index 03982801..c3b23acc 100644 --- a/internal/app/bridge/bridge.go +++ b/internal/app/bridge/bridge.go @@ -78,6 +78,7 @@ func run(b *base.Base, c *cli.Context) error { // nolint[funlen] defer b.CrashHandler.HandlePanic() imapPort := b.Settings.GetInt(settings.IMAPPortKey) imap.NewIMAPServer( + b.CrashHandler, c.String("log-imap") == "client" || c.String("log-imap") == "all", c.String("log-imap") == "server" || c.String("log-imap") == "all", imapPort, tlsConfig, imapBackend, b.Listener).ListenAndServe() diff --git a/internal/frontend/qt-common/accounts.go b/internal/frontend/qt-common/accounts.go index 495f1528..be05a186 100644 --- a/internal/frontend/qt-common/accounts.go +++ b/internal/frontend/qt-common/accounts.go @@ -172,7 +172,6 @@ func (a *Accounts) showLoginError(err error, scope string) bool { } a.qml.SetConnectionStatus(true) // If we are here connection is ok. if err == pmapi.ErrUpgradeApplication { - a.qml.EmitEvent(events.UpgradeApplicationEvent, "") return true } a.qml.SetAddAccountWarning(err.Error(), -1) diff --git a/internal/frontend/qt/accounts.go b/internal/frontend/qt/accounts.go index 8fc0766d..a86aacd5 100644 --- a/internal/frontend/qt/accounts.go +++ b/internal/frontend/qt/accounts.go @@ -128,7 +128,6 @@ func (s *FrontendQt) showLoginError(err error, scope string) bool { } s.Qml.SetConnectionStatus(true) // If we are here connection is ok. if err == pmapi.ErrUpgradeApplication { - s.eventListener.Emit(events.UpgradeApplicationEvent, "") return true } s.Qml.SetAddAccountWarning(err.Error(), -1) diff --git a/internal/imap/backend.go b/internal/imap/backend.go index 545aceb1..6e2d0a59 100644 --- a/internal/imap/backend.go +++ b/internal/imap/backend.go @@ -37,7 +37,7 @@ type panicHandler interface { type imapBackend struct { panicHandler panicHandler bridge bridger - updates chan goIMAPBackend.Update + updates *imapUpdates eventListener listener.Listener users map[string]*imapUser @@ -46,9 +46,6 @@ type imapBackend struct { imapCache map[string]map[string]string imapCachePath string imapCacheLock *sync.RWMutex - - updatesBlocking map[string]bool - updatesBlockingLocker sync.Locker } // NewIMAPBackend returns struct implementing go-imap/backend interface. @@ -75,7 +72,7 @@ func newIMAPBackend( return &imapBackend{ panicHandler: panicHandler, bridge: bridge, - updates: make(chan goIMAPBackend.Update), + updates: newIMAPUpdates(), eventListener: eventListener, users: map[string]*imapUser{}, @@ -83,9 +80,6 @@ func newIMAPBackend( imapCachePath: cache.GetIMAPCachePath(), imapCacheLock: &sync.RWMutex{}, - - updatesBlocking: map[string]bool{}, - updatesBlockingLocker: &sync.Mutex{}, } } @@ -172,7 +166,7 @@ func (ib *imapBackend) Login(_ *imap.ConnInfo, username, password string) (goIMA // so that it doesn't make bridge slow for users who are only using bridge for SMTP // (otherwise the store will be locked for 1 sec per email during synchronization). if store := imapUser.user.GetStore(); store != nil { - store.SetChangeNotifier(ib) + store.SetChangeNotifier(ib.updates) } return imapUser, nil @@ -183,7 +177,7 @@ func (ib *imapBackend) Updates() <-chan goIMAPBackend.Update { // Called from go-imap in goroutines - we need to handle panics for each function. defer ib.panicHandler.HandlePanic() - return ib.updates + return ib.updates.ch } func (ib *imapBackend) CreateMessageLimit() *uint32 { diff --git a/internal/imap/mailbox.go b/internal/imap/mailbox.go index cfb8d9b5..3593112b 100644 --- a/internal/imap/mailbox.go +++ b/internal/imap/mailbox.go @@ -188,8 +188,8 @@ func (im *imapMailbox) Expunge() error { // the desired mailbox. im.user.waitForAppend() - im.user.backend.setUpdatesBeBlocking(im.user.currentAddressLowercase, im.name, operationDeleteMessage) - defer im.user.backend.unsetUpdatesBeBlocking(im.user.currentAddressLowercase, im.name, operationDeleteMessage) + im.user.backend.updates.block(im.user.currentAddressLowercase, im.name, operationDeleteMessage) + defer im.user.backend.updates.unblock(im.user.currentAddressLowercase, im.name, operationDeleteMessage) return im.storeMailbox.RemoveDeleted(nil) } diff --git a/internal/imap/mailbox_message.go b/internal/imap/mailbox_message.go index baf091b3..8a138404 100644 --- a/internal/imap/mailbox_message.go +++ b/internal/imap/mailbox_message.go @@ -263,6 +263,7 @@ func (im *imapMailbox) getMessage(storeMessage storeMessageProvider, items []ima // Size attribute on the server counts encrypted data. The value is cleared // on our part and we need to compute "real" size of decrypted data. if m.Size <= 0 { + im.log.WithField("msgID", storeMessage.ID()).Trace("Size unknown - downloading body") if _, _, err = im.getBodyStructure(storeMessage); err != nil { return } @@ -308,14 +309,14 @@ func (im *imapMailbox) getBodyStructure(storeMessage storeMessageProvider) ( if bodyReader, structure = cache.LoadMail(id); bodyReader.Len() == 0 || structure == nil { var body []byte structure, body, err = im.buildMessage(m) + m.Size = int64(len(body)) + if err := storeMessage.SetSize(m.Size); err != nil { + im.log.WithError(err). + WithField("newSize", m.Size). + WithField("msgID", m.ID). + Warn("Cannot update size while building") + } if err == nil && structure != nil && len(body) > 0 { - m.Size = int64(len(body)) - if err := storeMessage.SetSize(m.Size); err != nil { - im.log.WithError(err). - WithField("newSize", m.Size). - WithField("msgID", m.ID). - Warn("Cannot update size while building") - } if err := storeMessage.SetContentTypeAndHeader(m.MIMEType, m.Header); err != nil { im.log.WithError(err). WithField("msgID", m.ID). diff --git a/internal/imap/mailbox_messages.go b/internal/imap/mailbox_messages.go index cc321d78..41b36447 100644 --- a/internal/imap/mailbox_messages.go +++ b/internal/imap/mailbox_messages.go @@ -46,8 +46,8 @@ func (im *imapMailbox) UpdateMessagesFlags(uid bool, seqSet *imap.SeqSet, operat // Called from go-imap in goroutines - we need to handle panics for each function. defer im.panicHandler.HandlePanic() - im.user.backend.setUpdatesBeBlocking(im.user.currentAddressLowercase, im.name, operationUpdateMessage) - defer im.user.backend.unsetUpdatesBeBlocking(im.user.currentAddressLowercase, im.name, operationUpdateMessage) + im.user.backend.updates.block(im.user.currentAddressLowercase, im.name, operationUpdateMessage) + defer im.user.backend.updates.unblock(im.user.currentAddressLowercase, im.name, operationUpdateMessage) messageIDs, err := im.apiIDsFromSeqSet(uid, seqSet) if err != nil || len(messageIDs) == 0 { @@ -455,8 +455,8 @@ func (im *imapMailbox) ListMessages(isUID bool, seqSet *imap.SeqSet, items []ima // EXPUNGE cannot be sent during listing and can come only from // the event loop, so we prevent any server side update to avoid // the problem. - im.user.pauseEventLoop() - defer im.user.unpauseEventLoop() + im.user.backend.updates.forbidExpunge(im.storeMailbox.LabelID()) + defer im.user.backend.updates.allowExpunge(im.storeMailbox.LabelID()) } var markAsReadIDs []string diff --git a/internal/imap/server.go b/internal/imap/server.go index 9001fbcd..c1ca74d0 100644 --- a/internal/imap/server.go +++ b/internal/imap/server.go @@ -43,16 +43,15 @@ import ( ) type imapServer struct { + panicHandler panicHandler server *imapserver.Server eventListener listener.Listener debugClient bool debugServer bool - - on bool } // NewIMAPServer constructs a new IMAP server configured with the given options. -func NewIMAPServer(debugClient, debugServer bool, port int, tls *tls.Config, imapBackend *imapBackend, eventListener listener.Listener) *imapServer { //nolint[golint] +func NewIMAPServer(panicHandler panicHandler, debugClient, debugServer bool, port int, tls *tls.Config, imapBackend *imapBackend, eventListener listener.Listener) *imapServer { //nolint[golint] s := imapserver.New(imapBackend) s.Addr = fmt.Sprintf("%v:%v", bridge.Host, port) s.TLSConfig = tls @@ -98,6 +97,7 @@ func NewIMAPServer(debugClient, debugServer bool, port int, tls *tls.Config, ima ) return &imapServer{ + panicHandler: panicHandler, server: s, eventListener: eventListener, debugClient: debugClient, @@ -114,14 +114,6 @@ func (s *imapServer) ListenAndServe() { } func (s *imapServer) listenAndServe() { - if s.on { - return - } - s.on = true - defer func() { - s.on = false - }() - log.Info("IMAP server listening at ", s.server.Addr) l, err := net.Listen("tcp", s.server.Addr) if err != nil { @@ -135,9 +127,18 @@ func (s *imapServer) listenAndServe() { server: s, }) if err != nil { - s.eventListener.Emit(events.ErrorEvent, "IMAP failed: "+err.Error()) - log.Error("IMAP failed: ", err) - return + failed := true + if netErr, ok := err.(*net.OpError); ok { + originalErr := netErr.Unwrap() + if originalErr != nil && originalErr.Error() == "use of closed network connection" { + failed = false + } + } + if failed { + s.eventListener.Emit(events.ErrorEvent, "IMAP failed: "+err.Error()) + log.Error("IMAP failed: ", err) + return + } } defer s.server.Close() //nolint[errcheck] @@ -146,6 +147,7 @@ func (s *imapServer) listenAndServe() { // Stops the server. func (s *imapServer) Close() { + log.Info("Closing IMAP server") if err := s.server.Close(); err != nil { log.WithError(err).Error("Failed to close the connection") } @@ -157,16 +159,30 @@ func (s *imapServer) monitorInternetConnection() { off := make(chan string) s.eventListener.Add(events.InternetOffEvent, off) - go func() { - for range on { - s.listenAndServe() - } - }() - go func() { - for range off { + isOn := true + for { + select { + case <-on: + if isOn { + continue + } + isOn = true + go func() { + defer s.panicHandler.HandlePanic() + s.listenAndServe() + }() + case <-off: + if !isOn { + continue + } + isOn = false s.Close() } - }() + // Give it some time to serve or close server before changing it again. + // E.g., if we get quickly off-on signal, starting or closing could + // fail because server is still running or not yet, respectively. + time.Sleep(10 * time.Second) + } } func (s *imapServer) monitorDisconnectedUsers() { diff --git a/internal/imap/store.go b/internal/imap/store.go index fec7cae9..afce5d1f 100644 --- a/internal/imap/store.go +++ b/internal/imap/store.go @@ -42,8 +42,6 @@ type storeUserProvider interface { attachedPublicKeyName string, parentID string) (*pmapi.Message, []*pmapi.Attachment, error) - PauseEventLoop(bool) - SetChangeNotifier(store.ChangeNotifier) } diff --git a/internal/imap/backend_updates.go b/internal/imap/updates.go similarity index 60% rename from internal/imap/backend_updates.go rename to internal/imap/updates.go index b148404d..29f82700 100644 --- a/internal/imap/backend_updates.go +++ b/internal/imap/updates.go @@ -19,6 +19,7 @@ package imap import ( "strings" + "sync" "time" "github.com/ProtonMail/proton-bridge/internal/store" @@ -36,32 +37,82 @@ const ( operationDeleteMessage operation = "expunge" ) -func (ib *imapBackend) setUpdatesBeBlocking(address, mailboxName string, op operation) { - ib.changeUpdatesBlocking(address, mailboxName, op, true) +type imapUpdates struct { + lock sync.Locker + blocking map[string]bool + delayedExpunges map[string][]chan struct{} + ch chan goIMAPBackend.Update } -func (ib *imapBackend) unsetUpdatesBeBlocking(address, mailboxName string, op operation) { - ib.changeUpdatesBlocking(address, mailboxName, op, false) -} - -func (ib *imapBackend) changeUpdatesBlocking(address, mailboxName string, op operation, block bool) { - ib.updatesBlockingLocker.Lock() - defer ib.updatesBlockingLocker.Unlock() - - key := strings.ToLower(address + "_" + mailboxName + "_" + string(op)) - if block { - ib.updatesBlocking[key] = true - } else { - delete(ib.updatesBlocking, key) +func newIMAPUpdates() *imapUpdates { + return &imapUpdates{ + lock: &sync.Mutex{}, + blocking: map[string]bool{}, + delayedExpunges: map[string][]chan struct{}{}, + ch: make(chan goIMAPBackend.Update), } } -func (ib *imapBackend) isBlocking(address, mailboxName string, op operation) bool { - key := strings.ToLower(address + "_" + mailboxName + "_" + string(op)) - return ib.updatesBlocking[key] +func (iu *imapUpdates) block(address, mailboxName string, op operation) { + iu.lock.Lock() + defer iu.lock.Unlock() + + iu.blocking[getBlockingKey(address, mailboxName, op)] = true } -func (ib *imapBackend) Notice(address, notice string) { +func (iu *imapUpdates) unblock(address, mailboxName string, op operation) { + iu.lock.Lock() + defer iu.lock.Unlock() + + delete(iu.blocking, getBlockingKey(address, mailboxName, op)) +} + +func (iu *imapUpdates) isBlocking(address, mailboxName string, op operation) bool { + iu.lock.Lock() + defer iu.lock.Unlock() + + return iu.blocking[getBlockingKey(address, mailboxName, op)] +} + +func getBlockingKey(address, mailboxName string, op operation) string { + return strings.ToLower(address + "_" + mailboxName + "_" + string(op)) +} + +func (iu *imapUpdates) forbidExpunge(mailboxID string) { + iu.lock.Lock() + defer iu.lock.Unlock() + + iu.delayedExpunges[mailboxID] = []chan struct{}{} +} + +func (iu *imapUpdates) allowExpunge(mailboxID string) { + iu.lock.Lock() + defer iu.lock.Unlock() + + for _, ch := range iu.delayedExpunges[mailboxID] { + close(ch) + } + delete(iu.delayedExpunges, mailboxID) +} + +func (iu *imapUpdates) CanDelete(mailboxID string) (bool, func()) { + iu.lock.Lock() + defer iu.lock.Unlock() + + if iu.delayedExpunges[mailboxID] == nil { + return true, nil + } + + ch := make(chan struct{}) + iu.delayedExpunges[mailboxID] = append(iu.delayedExpunges[mailboxID], ch) + return false, func() { + log.WithField("mailbox", mailboxID).Debug("Expunge operations paused") + <-ch + log.WithField("mailbox", mailboxID).Debug("Expunge operations unpaused") + } +} + +func (iu *imapUpdates) Notice(address, notice string) { update := new(goIMAPBackend.StatusUpdate) update.Update = goIMAPBackend.NewUpdate(address, "") update.StatusResp = &imap.StatusResp{ @@ -69,10 +120,10 @@ func (ib *imapBackend) Notice(address, notice string) { Code: imap.CodeAlert, Info: notice, } - ib.sendIMAPUpdate(update, false) + iu.sendIMAPUpdate(update, false) } -func (ib *imapBackend) UpdateMessage( +func (iu *imapUpdates) UpdateMessage( address, mailboxName string, uid, sequenceNumber uint32, msg *pmapi.Message, hasDeletedFlag bool, @@ -93,10 +144,10 @@ func (ib *imapBackend) UpdateMessage( update.Message.Flags = append(update.Message.Flags, imap.DeletedFlag) } update.Message.Uid = uid - ib.sendIMAPUpdate(update, ib.isBlocking(address, mailboxName, operationUpdateMessage)) + iu.sendIMAPUpdate(update, iu.isBlocking(address, mailboxName, operationUpdateMessage)) } -func (ib *imapBackend) DeleteMessage(address, mailboxName string, sequenceNumber uint32) { +func (iu *imapUpdates) DeleteMessage(address, mailboxName string, sequenceNumber uint32) { log.WithFields(logrus.Fields{ "address": address, "mailbox": mailboxName, @@ -105,10 +156,10 @@ func (ib *imapBackend) DeleteMessage(address, mailboxName string, sequenceNumber update := new(goIMAPBackend.ExpungeUpdate) update.Update = goIMAPBackend.NewUpdate(address, mailboxName) update.SeqNum = sequenceNumber - ib.sendIMAPUpdate(update, ib.isBlocking(address, mailboxName, operationDeleteMessage)) + iu.sendIMAPUpdate(update, iu.isBlocking(address, mailboxName, operationDeleteMessage)) } -func (ib *imapBackend) MailboxCreated(address, mailboxName string) { +func (iu *imapUpdates) MailboxCreated(address, mailboxName string) { log.WithFields(logrus.Fields{ "address": address, "mailbox": mailboxName, @@ -120,10 +171,10 @@ func (ib *imapBackend) MailboxCreated(address, mailboxName string) { Delimiter: store.PathDelimiter, Name: mailboxName, } - ib.sendIMAPUpdate(update, false) + iu.sendIMAPUpdate(update, false) } -func (ib *imapBackend) MailboxStatus(address, mailboxName string, total, unread, unreadSeqNum uint32) { +func (iu *imapUpdates) MailboxStatus(address, mailboxName string, total, unread, unreadSeqNum uint32) { log.WithFields(logrus.Fields{ "address": address, "mailbox": mailboxName, @@ -137,11 +188,11 @@ func (ib *imapBackend) MailboxStatus(address, mailboxName string, total, unread, update.MailboxStatus.Messages = total update.MailboxStatus.Unseen = unread update.MailboxStatus.UnseenSeqNum = unreadSeqNum - ib.sendIMAPUpdate(update, false) + iu.sendIMAPUpdate(update, false) } -func (ib *imapBackend) sendIMAPUpdate(update goIMAPBackend.Update, block bool) { - if ib.updates == nil { +func (iu *imapUpdates) sendIMAPUpdate(update goIMAPBackend.Update, block bool) { + if iu.ch == nil { log.Trace("IMAP IDLE unavailable") return } @@ -152,7 +203,7 @@ func (ib *imapBackend) sendIMAPUpdate(update goIMAPBackend.Update, block bool) { case <-time.After(1 * time.Second): log.Warn("IMAP update could not be sent (timeout)") return - case ib.updates <- update: + case iu.ch <- update: } }() diff --git a/internal/imap/user.go b/internal/imap/user.go index a4c72842..909c3ead 100644 --- a/internal/imap/user.go +++ b/internal/imap/user.go @@ -42,9 +42,6 @@ type imapUser struct { currentAddressLowercase string appendInProcess sync.WaitGroup - - eventLoopPausingCounter int - eventLoopPausingLocker sync.Locker } // newIMAPUser returns struct implementing go-imap/user interface. @@ -76,8 +73,6 @@ func newIMAPUser( storeAddress: storeAddress, currentAddressLowercase: strings.ToLower(address), - - eventLoopPausingLocker: &sync.Mutex{}, }, err } @@ -86,31 +81,6 @@ func (iu *imapUser) client() pmapi.Client { return iu.user.GetTemporaryPMAPIClient() } -// pauseEventLoop pauses event loop and increases the number of mailboxes which -// is performing action forbidding event loop to run (such as FETCH which needs -// UIDs to be stable and thus EXPUNGE cannot be done during the request). -func (iu *imapUser) pauseEventLoop() { - iu.eventLoopPausingLocker.Lock() - defer iu.eventLoopPausingLocker.Unlock() - - iu.eventLoopPausingCounter++ - iu.storeUser.PauseEventLoop(true) -} - -// unpauseEventLoop unpauses event loop but only if no other request is not -// performing action forbidding event loop to run (see pauseEventLoop). -func (iu *imapUser) unpauseEventLoop() { - iu.eventLoopPausingLocker.Lock() - defer iu.eventLoopPausingLocker.Unlock() - - if iu.eventLoopPausingCounter > 0 { - iu.eventLoopPausingCounter-- - } - if iu.eventLoopPausingCounter == 0 { - iu.storeUser.PauseEventLoop(false) - } -} - func (iu *imapUser) isSubscribed(labelID string) bool { subscriptionExceptions := iu.backend.getCacheList(iu.storeUser.UserID(), SubscriptionException) exceptions := strings.Split(subscriptionExceptions, ";") diff --git a/internal/store/change.go b/internal/store/change.go index 74de7407..e1b633fe 100644 --- a/internal/store/change.go +++ b/internal/store/change.go @@ -30,6 +30,8 @@ type ChangeNotifier interface { DeleteMessage(address, mailboxName string, sequenceNumber uint32) MailboxCreated(address, mailboxName string) MailboxStatus(address, mailboxName string, total, unread, unreadSeqNum uint32) + + CanDelete(mailboxID string) (bool, func()) } // SetChangeNotifier sets notifier to be called once mailbox or message changes. diff --git a/internal/store/event_loop.go b/internal/store/event_loop.go index 0f59501c..afd0e01c 100644 --- a/internal/store/event_loop.go +++ b/internal/store/event_loop.go @@ -39,8 +39,6 @@ type eventLoop struct { stopCh chan struct{} notifyStopCh chan struct{} isRunning bool // The whole event loop is running. - isTickerPaused bool // The periodic loop is paused (but the event loop itself is still running). - hasInternet bool pollCounter int @@ -60,7 +58,6 @@ func newEventLoop(cache *Cache, store *Store, user BridgeUser, events listener.L currentEventID: cache.getEventID(user.ID()), pollCh: make(chan chan struct{}), isRunning: false, - isTickerPaused: false, log: eventLog, @@ -135,8 +132,6 @@ func (loop *eventLoop) start() { loop.log.WithField("lastEventID", loop.currentEventID).Warn("Subscription stopped") }() - loop.hasInternet = true - go loop.pollNow() loop.loop() @@ -154,10 +149,6 @@ func (loop *eventLoop) loop() { close(loop.notifyStopCh) return case <-t.C: - if loop.isTickerPaused { - loop.log.Trace("Event loop paused, skipping") - continue - } // Randomise periodic calls within range pollInterval ± pollSpread to reduces potential load spikes on API. time.Sleep(time.Duration(rand.Intn(2*int(pollIntervalSpread.Milliseconds()))) * time.Millisecond) case eventProcessedCh = <-loop.pollCh: @@ -220,8 +211,6 @@ func (loop *eventLoop) processNextEvent() (more bool, err error) { // nolint[fun defer func() { if errors.Cause(err) == pmapi.ErrAPINotReachable { l.Warn("Internet unavailable") - loop.events.Emit(bridgeEvents.InternetOffEvent, "") - loop.hasInternet = false err = nil } @@ -233,7 +222,6 @@ func (loop *eventLoop) processNextEvent() (more bool, err error) { // nolint[fun if errors.Cause(err) == pmapi.ErrUpgradeApplication { l.Warn("Need to upgrade application") - loop.events.Emit(bridgeEvents.UpgradeApplicationEvent, "") err = nil } @@ -267,11 +255,6 @@ func (loop *eventLoop) processNextEvent() (more bool, err error) { // nolint[fun l = l.WithField("newEventID", event.EventID) - if !loop.hasInternet { - loop.events.Emit(bridgeEvents.InternetOnEvent, "") - loop.hasInternet = true - } - if err = loop.processEvent(event); err != nil { return false, errors.Wrap(err, "failed to process event") } @@ -464,6 +447,7 @@ func (loop *eventLoop) processMessages(eventLog *logrus.Entry, messages []*pmapi updateMessage(msgLog, msg, message.Updated) + loop.removeLabelFromMessageWait(message.Updated.LabelIDsRemoved) if err = loop.store.createOrUpdateMessageEvent(msg); err != nil { return errors.Wrap(err, "failed to update message in DB") } @@ -471,6 +455,7 @@ func (loop *eventLoop) processMessages(eventLog *logrus.Entry, messages []*pmapi case pmapi.EventDelete: msgLog.Debug("Processing EventDelete for message") + loop.removeMessageWait(message.ID) if err = loop.store.deleteMessageEvent(message.ID); err != nil { return errors.Wrap(err, "failed to delete message from DB") } @@ -480,6 +465,40 @@ func (loop *eventLoop) processMessages(eventLog *logrus.Entry, messages []*pmapi return err } +// removeMessageWait waits for notifier to be ready to accept delete +// operations for given message. It's no-op if message does not exist. +func (loop *eventLoop) removeMessageWait(msgID string) { + msg, err := loop.store.getMessageFromDB(msgID) + if err != nil { + return + } + loop.removeLabelFromMessageWait(msg.LabelIDs) +} + +// removeLabelFromMessageWait waits for notifier to be ready to accept +// delete operations for given labels. +func (loop *eventLoop) removeLabelFromMessageWait(labelIDs []string) { + if len(labelIDs) == 0 || loop.store.notifier == nil { + return + } + + for { + wasWaiting := false + for _, labelID := range labelIDs { + canDelete, wait := loop.store.notifier.CanDelete(labelID) + if !canDelete { + wasWaiting = true + wait() + } + } + // If we had to wait for some label, we need to check again + // all labels in case something changed in the meantime. + if !wasWaiting { + return + } + } +} + func updateMessage(msgLog *logrus.Entry, message *pmapi.Message, updates *pmapi.EventMessageUpdated) { //nolint[funlen] msgLog.Debug("Updating message") diff --git a/internal/store/mocks/mocks.go b/internal/store/mocks/mocks.go index b8921cc5..9f51ed77 100644 --- a/internal/store/mocks/mocks.go +++ b/internal/store/mocks/mocks.go @@ -266,6 +266,21 @@ func (m *MockChangeNotifier) EXPECT() *MockChangeNotifierMockRecorder { return m.recorder } +// CanDelete mocks base method +func (m *MockChangeNotifier) CanDelete(arg0 string) (bool, func()) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CanDelete", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(func()) + return ret0, ret1 +} + +// CanDelete indicates an expected call of CanDelete +func (mr *MockChangeNotifierMockRecorder) CanDelete(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CanDelete", reflect.TypeOf((*MockChangeNotifier)(nil).CanDelete), arg0) +} + // DeleteMessage mocks base method func (m *MockChangeNotifier) DeleteMessage(arg0, arg1 string, arg2 uint32) { m.ctrl.T.Helper() diff --git a/internal/store/store.go b/internal/store/store.go index 1f038ecd..6ff06052 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -350,18 +350,6 @@ func (store *Store) addAddress(address, addressID string, labels []*pmapi.Label) return } -// PauseEventLoop sets whether the ticker is periodically polling or not. -func (store *Store) PauseEventLoop(pause bool) { - store.lock.Lock() - defer store.lock.Unlock() - - store.log.WithField("pause", pause).Info("Pausing event loop") - - if store.eventLoop != nil { - store.eventLoop.isTickerPaused = pause - } -} - // Close stops the event loop and closes the database to free the file. func (store *Store) Close() error { store.lock.Lock() diff --git a/internal/users/user.go b/internal/users/user.go index 98836588..bd0783e5 100644 --- a/internal/users/user.go +++ b/internal/users/user.go @@ -152,12 +152,7 @@ func (u *User) authorizeIfNecessary(emitEvent bool) (err error) { u.log.WithError(err).Error("Could not authorize and unlock user") switch errors.Cause(err) { - case pmapi.ErrUpgradeApplication: - u.listener.Emit(events.UpgradeApplicationEvent, "") - - case pmapi.ErrAPINotReachable: - u.listener.Emit(events.InternetOffEvent, "") - + case pmapi.ErrUpgradeApplication, pmapi.ErrAPINotReachable: default: if errLogout := u.credStorer.Logout(u.userID); errLogout != nil { u.log.WithField("err", errLogout).Error("Could not log user out from credentials store") diff --git a/internal/users/user_new_test.go b/internal/users/user_new_test.go index 47c0719a..1c15b4d8 100644 --- a/internal/users/user_new_test.go +++ b/internal/users/user_new_test.go @@ -23,7 +23,6 @@ import ( "github.com/ProtonMail/proton-bridge/internal/events" "github.com/ProtonMail/proton-bridge/internal/users/credentials" - "github.com/ProtonMail/proton-bridge/pkg/pmapi" gomock "github.com/golang/mock/gomock" a "github.com/stretchr/testify/assert" ) @@ -38,44 +37,6 @@ func TestNewUserNoCredentialsStore(t *testing.T) { a.Error(t, err) } -func TestNewUserAppOutdated(t *testing.T) { - m := initMocks(t) - defer m.ctrl.Finish() - - m.clientManager.EXPECT().GetClient("user").Return(m.pmapiClient).MinTimes(1) - - gomock.InOrder( - m.credentialsStore.EXPECT().Get("user").Return(testCredentials, nil), - m.credentialsStore.EXPECT().Get("user").Return(testCredentials, nil), - m.pmapiClient.EXPECT().AuthRefresh("token").Return(nil, pmapi.ErrUpgradeApplication), - m.eventListener.EXPECT().Emit(events.UpgradeApplicationEvent, ""), - m.pmapiClient.EXPECT().ListLabels().Return(nil, pmapi.ErrUpgradeApplication), - m.pmapiClient.EXPECT().Addresses().Return(nil), - ) - - checkNewUserHasCredentials(testCredentials, m) -} - -func TestNewUserNoInternetConnection(t *testing.T) { - m := initMocks(t) - defer m.ctrl.Finish() - - m.clientManager.EXPECT().GetClient("user").Return(m.pmapiClient).MinTimes(1) - - gomock.InOrder( - m.credentialsStore.EXPECT().Get("user").Return(testCredentials, nil), - m.credentialsStore.EXPECT().Get("user").Return(testCredentials, nil), - m.pmapiClient.EXPECT().AuthRefresh("token").Return(nil, pmapi.ErrAPINotReachable), - m.eventListener.EXPECT().Emit(events.InternetOffEvent, ""), - - m.pmapiClient.EXPECT().ListLabels().Return(nil, pmapi.ErrAPINotReachable), - m.pmapiClient.EXPECT().Addresses().Return(nil), - m.pmapiClient.EXPECT().GetEvent("").Return(nil, pmapi.ErrAPINotReachable).AnyTimes(), - ) - - checkNewUserHasCredentials(testCredentials, m) -} - func TestNewUserAuthRefreshFails(t *testing.T) { m := initMocks(t) defer m.ctrl.Finish() diff --git a/internal/users/users.go b/internal/users/users.go index 05ef61ec..d397cd79 100644 --- a/internal/users/users.go +++ b/internal/users/users.go @@ -208,9 +208,6 @@ func (u *Users) Login(username, password string) (authClient pmapi.Client, auth // FinishLogin finishes the login procedure and adds the user into the credentials store. func (u *Users) FinishLogin(authClient pmapi.Client, auth *pmapi.Auth, mbPassphrase string) (user *User, err error) { //nolint[funlen] defer func() { - if err == pmapi.ErrUpgradeApplication { - u.events.Emit(events.UpgradeApplicationEvent, "") - } if err != nil { log.WithError(err).Debug("Login not finished; removing auth session") if delAuthErr := authClient.DeleteAuth(); delAuthErr != nil { diff --git a/internal/users/users_login_test.go b/internal/users/users_login_test.go index 97492f04..2025aef8 100644 --- a/internal/users/users_login_test.go +++ b/internal/users/users_login_test.go @@ -48,27 +48,6 @@ func TestUsersFinishLoginBadMailboxPassword(t *testing.T) { checkUsersFinishLogin(t, m, testAuth, testCredentials.MailboxPassword, "", err) } -func TestUsersFinishLoginUpgradeApplication(t *testing.T) { - m := initMocks(t) - defer m.ctrl.Finish() - - err := errors.New("Cannot logout when upgrade needed") - gomock.InOrder( - // Init users with no user from keychain. - m.credentialsStore.EXPECT().List().Return([]string{}, nil), - - // Set up mocks for FinishLogin. - m.pmapiClient.EXPECT().AuthSalt().Return("", nil), - m.pmapiClient.EXPECT().Unlock([]byte(testCredentials.MailboxPassword)).Return(pmapi.ErrUpgradeApplication), - - m.eventListener.EXPECT().Emit(events.UpgradeApplicationEvent, ""), - m.pmapiClient.EXPECT().DeleteAuth().Return(err), - m.pmapiClient.EXPECT().Logout(), - ) - - checkUsersFinishLogin(t, m, testAuth, testCredentials.MailboxPassword, "", pmapi.ErrUpgradeApplication) -} - func refreshWithToken(token string) *pmapi.Auth { return &pmapi.Auth{ RefreshToken: token, diff --git a/pkg/pmapi/client.go b/pkg/pmapi/client.go index 506b8e3b..6e41879c 100644 --- a/pkg/pmapi/client.go +++ b/pkg/pmapi/client.go @@ -103,8 +103,9 @@ type ClientConfig struct { // Zero means no limitation. MinBytesPerSecond int64 - NoConnectionHandler func() - ConnectionHandler func() + ConnectionOnHandler func() + ConnectionOffHandler func() + UpgradeApplicationHandler func() } // client is a client of the protonmail API. It implements the Client interface. @@ -265,6 +266,7 @@ func (c *client) doBuffered(req *http.Request, bodyBuffer []byte, retryUnauthori if res == nil { c.log.WithError(err).Error("Cannot get response") err = ErrAPINotReachable + c.cm.noConnection() } return } @@ -405,6 +407,12 @@ func (c *client) doJSONBuffered(req *http.Request, reqBodyBuffer []byte, data in } return c.doJSONBuffered(req, reqBodyBuffer, data) } + if errCode.Err() == ErrAPINotReachable { + c.cm.noConnection() + } + if errCode.Err() == ErrUpgradeApplication { + c.cm.config.UpgradeApplicationHandler() + } } if err := json.Unmarshal(resBody, data); err != nil { diff --git a/pkg/pmapi/clientmanager.go b/pkg/pmapi/clientmanager.go index c7cbfc3d..0b022f67 100644 --- a/pkg/pmapi/clientmanager.go +++ b/pkg/pmapi/clientmanager.go @@ -31,7 +31,7 @@ import ( const maxLogoutRetries = 5 // ClientManager is a manager of clients. -type ClientManager struct { +type ClientManager struct { //nolint[maligned] // newClient is used to create new Clients. By default this creates pmapi clients but it can be overridden to // create other types of clients (e.g. for integration tests). newClient func(userID string) Client @@ -61,6 +61,8 @@ type ClientManager struct { idGen idGen + connectionOff bool + log *logrus.Entry } @@ -108,6 +110,8 @@ func NewClientManager(config *ClientConfig) (cm *ClientManager) { proxyProvider: newProxyProvider(dohProviders, proxyQuery), proxyUseDuration: proxyUseDuration, + connectionOff: false, + log: logrus.WithField("pkg", "pmapi-manager"), } @@ -121,6 +125,30 @@ func NewClientManager(config *ClientConfig) (cm *ClientManager) { return cm } +func (cm *ClientManager) noConnection() { + cm.log.Trace("No connection available") + if cm.connectionOff { + return + } + + cm.log.Warn("Connection lost") + cm.config.ConnectionOffHandler() + cm.connectionOff = true + + go func() { + for { + time.Sleep(30 * time.Second) + + if err := cm.CheckConnection(); err == nil { + cm.log.Info("Connection re-established") + cm.config.ConnectionOnHandler() + cm.connectionOff = false + return + } + } + }() +} + // SetClientConstructor sets the method used to construct clients. // By default this is `pmapi.newClient` but can be overridden with this method. func (cm *ClientManager) SetClientConstructor(f func(userID string) Client) { diff --git a/pkg/pmapi/dialer_proxy.go b/pkg/pmapi/dialer_proxy.go index df25389e..5f341a01 100644 --- a/pkg/pmapi/dialer_proxy.go +++ b/pkg/pmapi/dialer_proxy.go @@ -37,17 +37,7 @@ func NewProxyTLSDialer(dialer TLSDialer, cm *ClientManager) *ProxyTLSDialer { } // DialTLS dials the given network/address. If it fails, it retries using a proxy. -func (d *ProxyTLSDialer) DialTLS(network, address string) (net.Conn, error) { - conn, err := d.dialTLS(network, address) - if err != nil { - d.cm.config.NoConnectionHandler() - } else { - d.cm.config.ConnectionHandler() - } - return conn, err -} - -func (d *ProxyTLSDialer) dialTLS(network, address string) (conn net.Conn, err error) { +func (d *ProxyTLSDialer) DialTLS(network, address string) (conn net.Conn, err error) { if conn, err = d.dialer.DialTLS(network, address); err == nil { return } diff --git a/test/context/imap.go b/test/context/imap.go index bcbe3775..edcbf74b 100644 --- a/test/context/imap.go +++ b/test/context/imap.go @@ -59,7 +59,7 @@ func (ctx *TestContext) withIMAPServer() { tls, _ := tls.New(settingsPath).GetConfig() backend := imap.NewIMAPBackend(ph, ctx.listener, ctx.cache, ctx.bridge) - server := imap.NewIMAPServer(true, true, port, tls, backend, ctx.listener) + server := imap.NewIMAPServer(ph, true, true, port, tls, backend, ctx.listener) go server.ListenAndServe() require.NoError(ctx.t, waitForPort(port, 5*time.Second))