1
0

Mitigate Apple Mail re-sync (both bodies and meta info)

This commit is contained in:
Michal Horejsek
2020-12-22 12:06:19 +01:00
parent 5117672388
commit 516ca018d3
24 changed files with 239 additions and 225 deletions

View File

@ -157,12 +157,15 @@ func New( // nolint[funlen]
} }
apiConfig := pmapi.GetAPIConfig(configName, constants.Version) apiConfig := pmapi.GetAPIConfig(configName, constants.Version)
apiConfig.NoConnectionHandler = func() { apiConfig.ConnectionOffHandler = func() {
eventListener.Emit(events.InternetOffEvent, "") eventListener.Emit(events.InternetOffEvent, "")
} }
apiConfig.ConnectionHandler = func() { apiConfig.ConnectionOnHandler = func() {
eventListener.Emit(events.InternetOnEvent, "") eventListener.Emit(events.InternetOnEvent, "")
} }
apiConfig.UpgradeApplicationHandler = func() {
eventListener.Emit(events.UpgradeApplicationEvent, "")
}
cm := pmapi.NewClientManager(apiConfig) cm := pmapi.NewClientManager(apiConfig)
cm.SetRoundTripper(pmapi.GetRoundTripper(cm, listener)) cm.SetRoundTripper(pmapi.GetRoundTripper(cm, listener))
cm.SetCookieJar(jar) cm.SetCookieJar(jar)

View File

@ -78,6 +78,7 @@ func run(b *base.Base, c *cli.Context) error { // nolint[funlen]
defer b.CrashHandler.HandlePanic() defer b.CrashHandler.HandlePanic()
imapPort := b.Settings.GetInt(settings.IMAPPortKey) imapPort := b.Settings.GetInt(settings.IMAPPortKey)
imap.NewIMAPServer( imap.NewIMAPServer(
b.CrashHandler,
c.String("log-imap") == "client" || c.String("log-imap") == "all", c.String("log-imap") == "client" || c.String("log-imap") == "all",
c.String("log-imap") == "server" || c.String("log-imap") == "all", c.String("log-imap") == "server" || c.String("log-imap") == "all",
imapPort, tlsConfig, imapBackend, b.Listener).ListenAndServe() imapPort, tlsConfig, imapBackend, b.Listener).ListenAndServe()

View File

@ -172,7 +172,6 @@ func (a *Accounts) showLoginError(err error, scope string) bool {
} }
a.qml.SetConnectionStatus(true) // If we are here connection is ok. a.qml.SetConnectionStatus(true) // If we are here connection is ok.
if err == pmapi.ErrUpgradeApplication { if err == pmapi.ErrUpgradeApplication {
a.qml.EmitEvent(events.UpgradeApplicationEvent, "")
return true return true
} }
a.qml.SetAddAccountWarning(err.Error(), -1) a.qml.SetAddAccountWarning(err.Error(), -1)

View File

@ -128,7 +128,6 @@ func (s *FrontendQt) showLoginError(err error, scope string) bool {
} }
s.Qml.SetConnectionStatus(true) // If we are here connection is ok. s.Qml.SetConnectionStatus(true) // If we are here connection is ok.
if err == pmapi.ErrUpgradeApplication { if err == pmapi.ErrUpgradeApplication {
s.eventListener.Emit(events.UpgradeApplicationEvent, "")
return true return true
} }
s.Qml.SetAddAccountWarning(err.Error(), -1) s.Qml.SetAddAccountWarning(err.Error(), -1)

View File

@ -37,7 +37,7 @@ type panicHandler interface {
type imapBackend struct { type imapBackend struct {
panicHandler panicHandler panicHandler panicHandler
bridge bridger bridge bridger
updates chan goIMAPBackend.Update updates *imapUpdates
eventListener listener.Listener eventListener listener.Listener
users map[string]*imapUser users map[string]*imapUser
@ -46,9 +46,6 @@ type imapBackend struct {
imapCache map[string]map[string]string imapCache map[string]map[string]string
imapCachePath string imapCachePath string
imapCacheLock *sync.RWMutex imapCacheLock *sync.RWMutex
updatesBlocking map[string]bool
updatesBlockingLocker sync.Locker
} }
// NewIMAPBackend returns struct implementing go-imap/backend interface. // NewIMAPBackend returns struct implementing go-imap/backend interface.
@ -75,7 +72,7 @@ func newIMAPBackend(
return &imapBackend{ return &imapBackend{
panicHandler: panicHandler, panicHandler: panicHandler,
bridge: bridge, bridge: bridge,
updates: make(chan goIMAPBackend.Update), updates: newIMAPUpdates(),
eventListener: eventListener, eventListener: eventListener,
users: map[string]*imapUser{}, users: map[string]*imapUser{},
@ -83,9 +80,6 @@ func newIMAPBackend(
imapCachePath: cache.GetIMAPCachePath(), imapCachePath: cache.GetIMAPCachePath(),
imapCacheLock: &sync.RWMutex{}, imapCacheLock: &sync.RWMutex{},
updatesBlocking: map[string]bool{},
updatesBlockingLocker: &sync.Mutex{},
} }
} }
@ -172,7 +166,7 @@ func (ib *imapBackend) Login(_ *imap.ConnInfo, username, password string) (goIMA
// so that it doesn't make bridge slow for users who are only using bridge for SMTP // so that it doesn't make bridge slow for users who are only using bridge for SMTP
// (otherwise the store will be locked for 1 sec per email during synchronization). // (otherwise the store will be locked for 1 sec per email during synchronization).
if store := imapUser.user.GetStore(); store != nil { if store := imapUser.user.GetStore(); store != nil {
store.SetChangeNotifier(ib) store.SetChangeNotifier(ib.updates)
} }
return imapUser, nil return imapUser, nil
@ -183,7 +177,7 @@ func (ib *imapBackend) Updates() <-chan goIMAPBackend.Update {
// Called from go-imap in goroutines - we need to handle panics for each function. // Called from go-imap in goroutines - we need to handle panics for each function.
defer ib.panicHandler.HandlePanic() defer ib.panicHandler.HandlePanic()
return ib.updates return ib.updates.ch
} }
func (ib *imapBackend) CreateMessageLimit() *uint32 { func (ib *imapBackend) CreateMessageLimit() *uint32 {

View File

@ -188,8 +188,8 @@ func (im *imapMailbox) Expunge() error {
// the desired mailbox. // the desired mailbox.
im.user.waitForAppend() im.user.waitForAppend()
im.user.backend.setUpdatesBeBlocking(im.user.currentAddressLowercase, im.name, operationDeleteMessage) im.user.backend.updates.block(im.user.currentAddressLowercase, im.name, operationDeleteMessage)
defer im.user.backend.unsetUpdatesBeBlocking(im.user.currentAddressLowercase, im.name, operationDeleteMessage) defer im.user.backend.updates.unblock(im.user.currentAddressLowercase, im.name, operationDeleteMessage)
return im.storeMailbox.RemoveDeleted(nil) return im.storeMailbox.RemoveDeleted(nil)
} }

View File

@ -263,6 +263,7 @@ func (im *imapMailbox) getMessage(storeMessage storeMessageProvider, items []ima
// Size attribute on the server counts encrypted data. The value is cleared // Size attribute on the server counts encrypted data. The value is cleared
// on our part and we need to compute "real" size of decrypted data. // on our part and we need to compute "real" size of decrypted data.
if m.Size <= 0 { if m.Size <= 0 {
im.log.WithField("msgID", storeMessage.ID()).Trace("Size unknown - downloading body")
if _, _, err = im.getBodyStructure(storeMessage); err != nil { if _, _, err = im.getBodyStructure(storeMessage); err != nil {
return return
} }
@ -308,14 +309,14 @@ func (im *imapMailbox) getBodyStructure(storeMessage storeMessageProvider) (
if bodyReader, structure = cache.LoadMail(id); bodyReader.Len() == 0 || structure == nil { if bodyReader, structure = cache.LoadMail(id); bodyReader.Len() == 0 || structure == nil {
var body []byte var body []byte
structure, body, err = im.buildMessage(m) structure, body, err = im.buildMessage(m)
m.Size = int64(len(body))
if err := storeMessage.SetSize(m.Size); err != nil {
im.log.WithError(err).
WithField("newSize", m.Size).
WithField("msgID", m.ID).
Warn("Cannot update size while building")
}
if err == nil && structure != nil && len(body) > 0 { if err == nil && structure != nil && len(body) > 0 {
m.Size = int64(len(body))
if err := storeMessage.SetSize(m.Size); err != nil {
im.log.WithError(err).
WithField("newSize", m.Size).
WithField("msgID", m.ID).
Warn("Cannot update size while building")
}
if err := storeMessage.SetContentTypeAndHeader(m.MIMEType, m.Header); err != nil { if err := storeMessage.SetContentTypeAndHeader(m.MIMEType, m.Header); err != nil {
im.log.WithError(err). im.log.WithError(err).
WithField("msgID", m.ID). WithField("msgID", m.ID).

View File

@ -46,8 +46,8 @@ func (im *imapMailbox) UpdateMessagesFlags(uid bool, seqSet *imap.SeqSet, operat
// Called from go-imap in goroutines - we need to handle panics for each function. // Called from go-imap in goroutines - we need to handle panics for each function.
defer im.panicHandler.HandlePanic() defer im.panicHandler.HandlePanic()
im.user.backend.setUpdatesBeBlocking(im.user.currentAddressLowercase, im.name, operationUpdateMessage) im.user.backend.updates.block(im.user.currentAddressLowercase, im.name, operationUpdateMessage)
defer im.user.backend.unsetUpdatesBeBlocking(im.user.currentAddressLowercase, im.name, operationUpdateMessage) defer im.user.backend.updates.unblock(im.user.currentAddressLowercase, im.name, operationUpdateMessage)
messageIDs, err := im.apiIDsFromSeqSet(uid, seqSet) messageIDs, err := im.apiIDsFromSeqSet(uid, seqSet)
if err != nil || len(messageIDs) == 0 { if err != nil || len(messageIDs) == 0 {
@ -455,8 +455,8 @@ func (im *imapMailbox) ListMessages(isUID bool, seqSet *imap.SeqSet, items []ima
// EXPUNGE cannot be sent during listing and can come only from // EXPUNGE cannot be sent during listing and can come only from
// the event loop, so we prevent any server side update to avoid // the event loop, so we prevent any server side update to avoid
// the problem. // the problem.
im.user.pauseEventLoop() im.user.backend.updates.forbidExpunge(im.storeMailbox.LabelID())
defer im.user.unpauseEventLoop() defer im.user.backend.updates.allowExpunge(im.storeMailbox.LabelID())
} }
var markAsReadIDs []string var markAsReadIDs []string

View File

@ -43,16 +43,15 @@ import (
) )
type imapServer struct { type imapServer struct {
panicHandler panicHandler
server *imapserver.Server server *imapserver.Server
eventListener listener.Listener eventListener listener.Listener
debugClient bool debugClient bool
debugServer bool debugServer bool
on bool
} }
// NewIMAPServer constructs a new IMAP server configured with the given options. // NewIMAPServer constructs a new IMAP server configured with the given options.
func NewIMAPServer(debugClient, debugServer bool, port int, tls *tls.Config, imapBackend *imapBackend, eventListener listener.Listener) *imapServer { //nolint[golint] func NewIMAPServer(panicHandler panicHandler, debugClient, debugServer bool, port int, tls *tls.Config, imapBackend *imapBackend, eventListener listener.Listener) *imapServer { //nolint[golint]
s := imapserver.New(imapBackend) s := imapserver.New(imapBackend)
s.Addr = fmt.Sprintf("%v:%v", bridge.Host, port) s.Addr = fmt.Sprintf("%v:%v", bridge.Host, port)
s.TLSConfig = tls s.TLSConfig = tls
@ -98,6 +97,7 @@ func NewIMAPServer(debugClient, debugServer bool, port int, tls *tls.Config, ima
) )
return &imapServer{ return &imapServer{
panicHandler: panicHandler,
server: s, server: s,
eventListener: eventListener, eventListener: eventListener,
debugClient: debugClient, debugClient: debugClient,
@ -114,14 +114,6 @@ func (s *imapServer) ListenAndServe() {
} }
func (s *imapServer) listenAndServe() { func (s *imapServer) listenAndServe() {
if s.on {
return
}
s.on = true
defer func() {
s.on = false
}()
log.Info("IMAP server listening at ", s.server.Addr) log.Info("IMAP server listening at ", s.server.Addr)
l, err := net.Listen("tcp", s.server.Addr) l, err := net.Listen("tcp", s.server.Addr)
if err != nil { if err != nil {
@ -135,9 +127,18 @@ func (s *imapServer) listenAndServe() {
server: s, server: s,
}) })
if err != nil { if err != nil {
s.eventListener.Emit(events.ErrorEvent, "IMAP failed: "+err.Error()) failed := true
log.Error("IMAP failed: ", err) if netErr, ok := err.(*net.OpError); ok {
return originalErr := netErr.Unwrap()
if originalErr != nil && originalErr.Error() == "use of closed network connection" {
failed = false
}
}
if failed {
s.eventListener.Emit(events.ErrorEvent, "IMAP failed: "+err.Error())
log.Error("IMAP failed: ", err)
return
}
} }
defer s.server.Close() //nolint[errcheck] defer s.server.Close() //nolint[errcheck]
@ -146,6 +147,7 @@ func (s *imapServer) listenAndServe() {
// Stops the server. // Stops the server.
func (s *imapServer) Close() { func (s *imapServer) Close() {
log.Info("Closing IMAP server")
if err := s.server.Close(); err != nil { if err := s.server.Close(); err != nil {
log.WithError(err).Error("Failed to close the connection") log.WithError(err).Error("Failed to close the connection")
} }
@ -157,16 +159,30 @@ func (s *imapServer) monitorInternetConnection() {
off := make(chan string) off := make(chan string)
s.eventListener.Add(events.InternetOffEvent, off) s.eventListener.Add(events.InternetOffEvent, off)
go func() { isOn := true
for range on { for {
s.listenAndServe() select {
} case <-on:
}() if isOn {
go func() { continue
for range off { }
isOn = true
go func() {
defer s.panicHandler.HandlePanic()
s.listenAndServe()
}()
case <-off:
if !isOn {
continue
}
isOn = false
s.Close() s.Close()
} }
}() // Give it some time to serve or close server before changing it again.
// E.g., if we get quickly off-on signal, starting or closing could
// fail because server is still running or not yet, respectively.
time.Sleep(10 * time.Second)
}
} }
func (s *imapServer) monitorDisconnectedUsers() { func (s *imapServer) monitorDisconnectedUsers() {

View File

@ -42,8 +42,6 @@ type storeUserProvider interface {
attachedPublicKeyName string, attachedPublicKeyName string,
parentID string) (*pmapi.Message, []*pmapi.Attachment, error) parentID string) (*pmapi.Message, []*pmapi.Attachment, error)
PauseEventLoop(bool)
SetChangeNotifier(store.ChangeNotifier) SetChangeNotifier(store.ChangeNotifier)
} }

View File

@ -19,6 +19,7 @@ package imap
import ( import (
"strings" "strings"
"sync"
"time" "time"
"github.com/ProtonMail/proton-bridge/internal/store" "github.com/ProtonMail/proton-bridge/internal/store"
@ -36,32 +37,82 @@ const (
operationDeleteMessage operation = "expunge" operationDeleteMessage operation = "expunge"
) )
func (ib *imapBackend) setUpdatesBeBlocking(address, mailboxName string, op operation) { type imapUpdates struct {
ib.changeUpdatesBlocking(address, mailboxName, op, true) lock sync.Locker
blocking map[string]bool
delayedExpunges map[string][]chan struct{}
ch chan goIMAPBackend.Update
} }
func (ib *imapBackend) unsetUpdatesBeBlocking(address, mailboxName string, op operation) { func newIMAPUpdates() *imapUpdates {
ib.changeUpdatesBlocking(address, mailboxName, op, false) return &imapUpdates{
} lock: &sync.Mutex{},
blocking: map[string]bool{},
func (ib *imapBackend) changeUpdatesBlocking(address, mailboxName string, op operation, block bool) { delayedExpunges: map[string][]chan struct{}{},
ib.updatesBlockingLocker.Lock() ch: make(chan goIMAPBackend.Update),
defer ib.updatesBlockingLocker.Unlock()
key := strings.ToLower(address + "_" + mailboxName + "_" + string(op))
if block {
ib.updatesBlocking[key] = true
} else {
delete(ib.updatesBlocking, key)
} }
} }
func (ib *imapBackend) isBlocking(address, mailboxName string, op operation) bool { func (iu *imapUpdates) block(address, mailboxName string, op operation) {
key := strings.ToLower(address + "_" + mailboxName + "_" + string(op)) iu.lock.Lock()
return ib.updatesBlocking[key] defer iu.lock.Unlock()
iu.blocking[getBlockingKey(address, mailboxName, op)] = true
} }
func (ib *imapBackend) Notice(address, notice string) { func (iu *imapUpdates) unblock(address, mailboxName string, op operation) {
iu.lock.Lock()
defer iu.lock.Unlock()
delete(iu.blocking, getBlockingKey(address, mailboxName, op))
}
func (iu *imapUpdates) isBlocking(address, mailboxName string, op operation) bool {
iu.lock.Lock()
defer iu.lock.Unlock()
return iu.blocking[getBlockingKey(address, mailboxName, op)]
}
func getBlockingKey(address, mailboxName string, op operation) string {
return strings.ToLower(address + "_" + mailboxName + "_" + string(op))
}
func (iu *imapUpdates) forbidExpunge(mailboxID string) {
iu.lock.Lock()
defer iu.lock.Unlock()
iu.delayedExpunges[mailboxID] = []chan struct{}{}
}
func (iu *imapUpdates) allowExpunge(mailboxID string) {
iu.lock.Lock()
defer iu.lock.Unlock()
for _, ch := range iu.delayedExpunges[mailboxID] {
close(ch)
}
delete(iu.delayedExpunges, mailboxID)
}
func (iu *imapUpdates) CanDelete(mailboxID string) (bool, func()) {
iu.lock.Lock()
defer iu.lock.Unlock()
if iu.delayedExpunges[mailboxID] == nil {
return true, nil
}
ch := make(chan struct{})
iu.delayedExpunges[mailboxID] = append(iu.delayedExpunges[mailboxID], ch)
return false, func() {
log.WithField("mailbox", mailboxID).Debug("Expunge operations paused")
<-ch
log.WithField("mailbox", mailboxID).Debug("Expunge operations unpaused")
}
}
func (iu *imapUpdates) Notice(address, notice string) {
update := new(goIMAPBackend.StatusUpdate) update := new(goIMAPBackend.StatusUpdate)
update.Update = goIMAPBackend.NewUpdate(address, "") update.Update = goIMAPBackend.NewUpdate(address, "")
update.StatusResp = &imap.StatusResp{ update.StatusResp = &imap.StatusResp{
@ -69,10 +120,10 @@ func (ib *imapBackend) Notice(address, notice string) {
Code: imap.CodeAlert, Code: imap.CodeAlert,
Info: notice, Info: notice,
} }
ib.sendIMAPUpdate(update, false) iu.sendIMAPUpdate(update, false)
} }
func (ib *imapBackend) UpdateMessage( func (iu *imapUpdates) UpdateMessage(
address, mailboxName string, address, mailboxName string,
uid, sequenceNumber uint32, uid, sequenceNumber uint32,
msg *pmapi.Message, hasDeletedFlag bool, msg *pmapi.Message, hasDeletedFlag bool,
@ -93,10 +144,10 @@ func (ib *imapBackend) UpdateMessage(
update.Message.Flags = append(update.Message.Flags, imap.DeletedFlag) update.Message.Flags = append(update.Message.Flags, imap.DeletedFlag)
} }
update.Message.Uid = uid update.Message.Uid = uid
ib.sendIMAPUpdate(update, ib.isBlocking(address, mailboxName, operationUpdateMessage)) iu.sendIMAPUpdate(update, iu.isBlocking(address, mailboxName, operationUpdateMessage))
} }
func (ib *imapBackend) DeleteMessage(address, mailboxName string, sequenceNumber uint32) { func (iu *imapUpdates) DeleteMessage(address, mailboxName string, sequenceNumber uint32) {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"address": address, "address": address,
"mailbox": mailboxName, "mailbox": mailboxName,
@ -105,10 +156,10 @@ func (ib *imapBackend) DeleteMessage(address, mailboxName string, sequenceNumber
update := new(goIMAPBackend.ExpungeUpdate) update := new(goIMAPBackend.ExpungeUpdate)
update.Update = goIMAPBackend.NewUpdate(address, mailboxName) update.Update = goIMAPBackend.NewUpdate(address, mailboxName)
update.SeqNum = sequenceNumber update.SeqNum = sequenceNumber
ib.sendIMAPUpdate(update, ib.isBlocking(address, mailboxName, operationDeleteMessage)) iu.sendIMAPUpdate(update, iu.isBlocking(address, mailboxName, operationDeleteMessage))
} }
func (ib *imapBackend) MailboxCreated(address, mailboxName string) { func (iu *imapUpdates) MailboxCreated(address, mailboxName string) {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"address": address, "address": address,
"mailbox": mailboxName, "mailbox": mailboxName,
@ -120,10 +171,10 @@ func (ib *imapBackend) MailboxCreated(address, mailboxName string) {
Delimiter: store.PathDelimiter, Delimiter: store.PathDelimiter,
Name: mailboxName, Name: mailboxName,
} }
ib.sendIMAPUpdate(update, false) iu.sendIMAPUpdate(update, false)
} }
func (ib *imapBackend) MailboxStatus(address, mailboxName string, total, unread, unreadSeqNum uint32) { func (iu *imapUpdates) MailboxStatus(address, mailboxName string, total, unread, unreadSeqNum uint32) {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"address": address, "address": address,
"mailbox": mailboxName, "mailbox": mailboxName,
@ -137,11 +188,11 @@ func (ib *imapBackend) MailboxStatus(address, mailboxName string, total, unread,
update.MailboxStatus.Messages = total update.MailboxStatus.Messages = total
update.MailboxStatus.Unseen = unread update.MailboxStatus.Unseen = unread
update.MailboxStatus.UnseenSeqNum = unreadSeqNum update.MailboxStatus.UnseenSeqNum = unreadSeqNum
ib.sendIMAPUpdate(update, false) iu.sendIMAPUpdate(update, false)
} }
func (ib *imapBackend) sendIMAPUpdate(update goIMAPBackend.Update, block bool) { func (iu *imapUpdates) sendIMAPUpdate(update goIMAPBackend.Update, block bool) {
if ib.updates == nil { if iu.ch == nil {
log.Trace("IMAP IDLE unavailable") log.Trace("IMAP IDLE unavailable")
return return
} }
@ -152,7 +203,7 @@ func (ib *imapBackend) sendIMAPUpdate(update goIMAPBackend.Update, block bool) {
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
log.Warn("IMAP update could not be sent (timeout)") log.Warn("IMAP update could not be sent (timeout)")
return return
case ib.updates <- update: case iu.ch <- update:
} }
}() }()

View File

@ -42,9 +42,6 @@ type imapUser struct {
currentAddressLowercase string currentAddressLowercase string
appendInProcess sync.WaitGroup appendInProcess sync.WaitGroup
eventLoopPausingCounter int
eventLoopPausingLocker sync.Locker
} }
// newIMAPUser returns struct implementing go-imap/user interface. // newIMAPUser returns struct implementing go-imap/user interface.
@ -76,8 +73,6 @@ func newIMAPUser(
storeAddress: storeAddress, storeAddress: storeAddress,
currentAddressLowercase: strings.ToLower(address), currentAddressLowercase: strings.ToLower(address),
eventLoopPausingLocker: &sync.Mutex{},
}, err }, err
} }
@ -86,31 +81,6 @@ func (iu *imapUser) client() pmapi.Client {
return iu.user.GetTemporaryPMAPIClient() return iu.user.GetTemporaryPMAPIClient()
} }
// pauseEventLoop pauses event loop and increases the number of mailboxes which
// is performing action forbidding event loop to run (such as FETCH which needs
// UIDs to be stable and thus EXPUNGE cannot be done during the request).
func (iu *imapUser) pauseEventLoop() {
iu.eventLoopPausingLocker.Lock()
defer iu.eventLoopPausingLocker.Unlock()
iu.eventLoopPausingCounter++
iu.storeUser.PauseEventLoop(true)
}
// unpauseEventLoop unpauses event loop but only if no other request is not
// performing action forbidding event loop to run (see pauseEventLoop).
func (iu *imapUser) unpauseEventLoop() {
iu.eventLoopPausingLocker.Lock()
defer iu.eventLoopPausingLocker.Unlock()
if iu.eventLoopPausingCounter > 0 {
iu.eventLoopPausingCounter--
}
if iu.eventLoopPausingCounter == 0 {
iu.storeUser.PauseEventLoop(false)
}
}
func (iu *imapUser) isSubscribed(labelID string) bool { func (iu *imapUser) isSubscribed(labelID string) bool {
subscriptionExceptions := iu.backend.getCacheList(iu.storeUser.UserID(), SubscriptionException) subscriptionExceptions := iu.backend.getCacheList(iu.storeUser.UserID(), SubscriptionException)
exceptions := strings.Split(subscriptionExceptions, ";") exceptions := strings.Split(subscriptionExceptions, ";")

View File

@ -30,6 +30,8 @@ type ChangeNotifier interface {
DeleteMessage(address, mailboxName string, sequenceNumber uint32) DeleteMessage(address, mailboxName string, sequenceNumber uint32)
MailboxCreated(address, mailboxName string) MailboxCreated(address, mailboxName string)
MailboxStatus(address, mailboxName string, total, unread, unreadSeqNum uint32) MailboxStatus(address, mailboxName string, total, unread, unreadSeqNum uint32)
CanDelete(mailboxID string) (bool, func())
} }
// SetChangeNotifier sets notifier to be called once mailbox or message changes. // SetChangeNotifier sets notifier to be called once mailbox or message changes.

View File

@ -39,8 +39,6 @@ type eventLoop struct {
stopCh chan struct{} stopCh chan struct{}
notifyStopCh chan struct{} notifyStopCh chan struct{}
isRunning bool // The whole event loop is running. isRunning bool // The whole event loop is running.
isTickerPaused bool // The periodic loop is paused (but the event loop itself is still running).
hasInternet bool
pollCounter int pollCounter int
@ -60,7 +58,6 @@ func newEventLoop(cache *Cache, store *Store, user BridgeUser, events listener.L
currentEventID: cache.getEventID(user.ID()), currentEventID: cache.getEventID(user.ID()),
pollCh: make(chan chan struct{}), pollCh: make(chan chan struct{}),
isRunning: false, isRunning: false,
isTickerPaused: false,
log: eventLog, log: eventLog,
@ -135,8 +132,6 @@ func (loop *eventLoop) start() {
loop.log.WithField("lastEventID", loop.currentEventID).Warn("Subscription stopped") loop.log.WithField("lastEventID", loop.currentEventID).Warn("Subscription stopped")
}() }()
loop.hasInternet = true
go loop.pollNow() go loop.pollNow()
loop.loop() loop.loop()
@ -154,10 +149,6 @@ func (loop *eventLoop) loop() {
close(loop.notifyStopCh) close(loop.notifyStopCh)
return return
case <-t.C: case <-t.C:
if loop.isTickerPaused {
loop.log.Trace("Event loop paused, skipping")
continue
}
// Randomise periodic calls within range pollInterval ± pollSpread to reduces potential load spikes on API. // Randomise periodic calls within range pollInterval ± pollSpread to reduces potential load spikes on API.
time.Sleep(time.Duration(rand.Intn(2*int(pollIntervalSpread.Milliseconds()))) * time.Millisecond) time.Sleep(time.Duration(rand.Intn(2*int(pollIntervalSpread.Milliseconds()))) * time.Millisecond)
case eventProcessedCh = <-loop.pollCh: case eventProcessedCh = <-loop.pollCh:
@ -220,8 +211,6 @@ func (loop *eventLoop) processNextEvent() (more bool, err error) { // nolint[fun
defer func() { defer func() {
if errors.Cause(err) == pmapi.ErrAPINotReachable { if errors.Cause(err) == pmapi.ErrAPINotReachable {
l.Warn("Internet unavailable") l.Warn("Internet unavailable")
loop.events.Emit(bridgeEvents.InternetOffEvent, "")
loop.hasInternet = false
err = nil err = nil
} }
@ -233,7 +222,6 @@ func (loop *eventLoop) processNextEvent() (more bool, err error) { // nolint[fun
if errors.Cause(err) == pmapi.ErrUpgradeApplication { if errors.Cause(err) == pmapi.ErrUpgradeApplication {
l.Warn("Need to upgrade application") l.Warn("Need to upgrade application")
loop.events.Emit(bridgeEvents.UpgradeApplicationEvent, "")
err = nil err = nil
} }
@ -267,11 +255,6 @@ func (loop *eventLoop) processNextEvent() (more bool, err error) { // nolint[fun
l = l.WithField("newEventID", event.EventID) l = l.WithField("newEventID", event.EventID)
if !loop.hasInternet {
loop.events.Emit(bridgeEvents.InternetOnEvent, "")
loop.hasInternet = true
}
if err = loop.processEvent(event); err != nil { if err = loop.processEvent(event); err != nil {
return false, errors.Wrap(err, "failed to process event") return false, errors.Wrap(err, "failed to process event")
} }
@ -464,6 +447,7 @@ func (loop *eventLoop) processMessages(eventLog *logrus.Entry, messages []*pmapi
updateMessage(msgLog, msg, message.Updated) updateMessage(msgLog, msg, message.Updated)
loop.removeLabelFromMessageWait(message.Updated.LabelIDsRemoved)
if err = loop.store.createOrUpdateMessageEvent(msg); err != nil { if err = loop.store.createOrUpdateMessageEvent(msg); err != nil {
return errors.Wrap(err, "failed to update message in DB") return errors.Wrap(err, "failed to update message in DB")
} }
@ -471,6 +455,7 @@ func (loop *eventLoop) processMessages(eventLog *logrus.Entry, messages []*pmapi
case pmapi.EventDelete: case pmapi.EventDelete:
msgLog.Debug("Processing EventDelete for message") msgLog.Debug("Processing EventDelete for message")
loop.removeMessageWait(message.ID)
if err = loop.store.deleteMessageEvent(message.ID); err != nil { if err = loop.store.deleteMessageEvent(message.ID); err != nil {
return errors.Wrap(err, "failed to delete message from DB") return errors.Wrap(err, "failed to delete message from DB")
} }
@ -480,6 +465,40 @@ func (loop *eventLoop) processMessages(eventLog *logrus.Entry, messages []*pmapi
return err return err
} }
// removeMessageWait waits for notifier to be ready to accept delete
// operations for given message. It's no-op if message does not exist.
func (loop *eventLoop) removeMessageWait(msgID string) {
msg, err := loop.store.getMessageFromDB(msgID)
if err != nil {
return
}
loop.removeLabelFromMessageWait(msg.LabelIDs)
}
// removeLabelFromMessageWait waits for notifier to be ready to accept
// delete operations for given labels.
func (loop *eventLoop) removeLabelFromMessageWait(labelIDs []string) {
if len(labelIDs) == 0 || loop.store.notifier == nil {
return
}
for {
wasWaiting := false
for _, labelID := range labelIDs {
canDelete, wait := loop.store.notifier.CanDelete(labelID)
if !canDelete {
wasWaiting = true
wait()
}
}
// If we had to wait for some label, we need to check again
// all labels in case something changed in the meantime.
if !wasWaiting {
return
}
}
}
func updateMessage(msgLog *logrus.Entry, message *pmapi.Message, updates *pmapi.EventMessageUpdated) { //nolint[funlen] func updateMessage(msgLog *logrus.Entry, message *pmapi.Message, updates *pmapi.EventMessageUpdated) { //nolint[funlen]
msgLog.Debug("Updating message") msgLog.Debug("Updating message")

View File

@ -266,6 +266,21 @@ func (m *MockChangeNotifier) EXPECT() *MockChangeNotifierMockRecorder {
return m.recorder return m.recorder
} }
// CanDelete mocks base method
func (m *MockChangeNotifier) CanDelete(arg0 string) (bool, func()) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CanDelete", arg0)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(func())
return ret0, ret1
}
// CanDelete indicates an expected call of CanDelete
func (mr *MockChangeNotifierMockRecorder) CanDelete(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CanDelete", reflect.TypeOf((*MockChangeNotifier)(nil).CanDelete), arg0)
}
// DeleteMessage mocks base method // DeleteMessage mocks base method
func (m *MockChangeNotifier) DeleteMessage(arg0, arg1 string, arg2 uint32) { func (m *MockChangeNotifier) DeleteMessage(arg0, arg1 string, arg2 uint32) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -350,18 +350,6 @@ func (store *Store) addAddress(address, addressID string, labels []*pmapi.Label)
return return
} }
// PauseEventLoop sets whether the ticker is periodically polling or not.
func (store *Store) PauseEventLoop(pause bool) {
store.lock.Lock()
defer store.lock.Unlock()
store.log.WithField("pause", pause).Info("Pausing event loop")
if store.eventLoop != nil {
store.eventLoop.isTickerPaused = pause
}
}
// Close stops the event loop and closes the database to free the file. // Close stops the event loop and closes the database to free the file.
func (store *Store) Close() error { func (store *Store) Close() error {
store.lock.Lock() store.lock.Lock()

View File

@ -152,12 +152,7 @@ func (u *User) authorizeIfNecessary(emitEvent bool) (err error) {
u.log.WithError(err).Error("Could not authorize and unlock user") u.log.WithError(err).Error("Could not authorize and unlock user")
switch errors.Cause(err) { switch errors.Cause(err) {
case pmapi.ErrUpgradeApplication: case pmapi.ErrUpgradeApplication, pmapi.ErrAPINotReachable:
u.listener.Emit(events.UpgradeApplicationEvent, "")
case pmapi.ErrAPINotReachable:
u.listener.Emit(events.InternetOffEvent, "")
default: default:
if errLogout := u.credStorer.Logout(u.userID); errLogout != nil { if errLogout := u.credStorer.Logout(u.userID); errLogout != nil {
u.log.WithField("err", errLogout).Error("Could not log user out from credentials store") u.log.WithField("err", errLogout).Error("Could not log user out from credentials store")

View File

@ -23,7 +23,6 @@ import (
"github.com/ProtonMail/proton-bridge/internal/events" "github.com/ProtonMail/proton-bridge/internal/events"
"github.com/ProtonMail/proton-bridge/internal/users/credentials" "github.com/ProtonMail/proton-bridge/internal/users/credentials"
"github.com/ProtonMail/proton-bridge/pkg/pmapi"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
a "github.com/stretchr/testify/assert" a "github.com/stretchr/testify/assert"
) )
@ -38,44 +37,6 @@ func TestNewUserNoCredentialsStore(t *testing.T) {
a.Error(t, err) a.Error(t, err)
} }
func TestNewUserAppOutdated(t *testing.T) {
m := initMocks(t)
defer m.ctrl.Finish()
m.clientManager.EXPECT().GetClient("user").Return(m.pmapiClient).MinTimes(1)
gomock.InOrder(
m.credentialsStore.EXPECT().Get("user").Return(testCredentials, nil),
m.credentialsStore.EXPECT().Get("user").Return(testCredentials, nil),
m.pmapiClient.EXPECT().AuthRefresh("token").Return(nil, pmapi.ErrUpgradeApplication),
m.eventListener.EXPECT().Emit(events.UpgradeApplicationEvent, ""),
m.pmapiClient.EXPECT().ListLabels().Return(nil, pmapi.ErrUpgradeApplication),
m.pmapiClient.EXPECT().Addresses().Return(nil),
)
checkNewUserHasCredentials(testCredentials, m)
}
func TestNewUserNoInternetConnection(t *testing.T) {
m := initMocks(t)
defer m.ctrl.Finish()
m.clientManager.EXPECT().GetClient("user").Return(m.pmapiClient).MinTimes(1)
gomock.InOrder(
m.credentialsStore.EXPECT().Get("user").Return(testCredentials, nil),
m.credentialsStore.EXPECT().Get("user").Return(testCredentials, nil),
m.pmapiClient.EXPECT().AuthRefresh("token").Return(nil, pmapi.ErrAPINotReachable),
m.eventListener.EXPECT().Emit(events.InternetOffEvent, ""),
m.pmapiClient.EXPECT().ListLabels().Return(nil, pmapi.ErrAPINotReachable),
m.pmapiClient.EXPECT().Addresses().Return(nil),
m.pmapiClient.EXPECT().GetEvent("").Return(nil, pmapi.ErrAPINotReachable).AnyTimes(),
)
checkNewUserHasCredentials(testCredentials, m)
}
func TestNewUserAuthRefreshFails(t *testing.T) { func TestNewUserAuthRefreshFails(t *testing.T) {
m := initMocks(t) m := initMocks(t)
defer m.ctrl.Finish() defer m.ctrl.Finish()

View File

@ -208,9 +208,6 @@ func (u *Users) Login(username, password string) (authClient pmapi.Client, auth
// FinishLogin finishes the login procedure and adds the user into the credentials store. // FinishLogin finishes the login procedure and adds the user into the credentials store.
func (u *Users) FinishLogin(authClient pmapi.Client, auth *pmapi.Auth, mbPassphrase string) (user *User, err error) { //nolint[funlen] func (u *Users) FinishLogin(authClient pmapi.Client, auth *pmapi.Auth, mbPassphrase string) (user *User, err error) { //nolint[funlen]
defer func() { defer func() {
if err == pmapi.ErrUpgradeApplication {
u.events.Emit(events.UpgradeApplicationEvent, "")
}
if err != nil { if err != nil {
log.WithError(err).Debug("Login not finished; removing auth session") log.WithError(err).Debug("Login not finished; removing auth session")
if delAuthErr := authClient.DeleteAuth(); delAuthErr != nil { if delAuthErr := authClient.DeleteAuth(); delAuthErr != nil {

View File

@ -48,27 +48,6 @@ func TestUsersFinishLoginBadMailboxPassword(t *testing.T) {
checkUsersFinishLogin(t, m, testAuth, testCredentials.MailboxPassword, "", err) checkUsersFinishLogin(t, m, testAuth, testCredentials.MailboxPassword, "", err)
} }
func TestUsersFinishLoginUpgradeApplication(t *testing.T) {
m := initMocks(t)
defer m.ctrl.Finish()
err := errors.New("Cannot logout when upgrade needed")
gomock.InOrder(
// Init users with no user from keychain.
m.credentialsStore.EXPECT().List().Return([]string{}, nil),
// Set up mocks for FinishLogin.
m.pmapiClient.EXPECT().AuthSalt().Return("", nil),
m.pmapiClient.EXPECT().Unlock([]byte(testCredentials.MailboxPassword)).Return(pmapi.ErrUpgradeApplication),
m.eventListener.EXPECT().Emit(events.UpgradeApplicationEvent, ""),
m.pmapiClient.EXPECT().DeleteAuth().Return(err),
m.pmapiClient.EXPECT().Logout(),
)
checkUsersFinishLogin(t, m, testAuth, testCredentials.MailboxPassword, "", pmapi.ErrUpgradeApplication)
}
func refreshWithToken(token string) *pmapi.Auth { func refreshWithToken(token string) *pmapi.Auth {
return &pmapi.Auth{ return &pmapi.Auth{
RefreshToken: token, RefreshToken: token,

View File

@ -103,8 +103,9 @@ type ClientConfig struct {
// Zero means no limitation. // Zero means no limitation.
MinBytesPerSecond int64 MinBytesPerSecond int64
NoConnectionHandler func() ConnectionOnHandler func()
ConnectionHandler func() ConnectionOffHandler func()
UpgradeApplicationHandler func()
} }
// client is a client of the protonmail API. It implements the Client interface. // client is a client of the protonmail API. It implements the Client interface.
@ -265,6 +266,7 @@ func (c *client) doBuffered(req *http.Request, bodyBuffer []byte, retryUnauthori
if res == nil { if res == nil {
c.log.WithError(err).Error("Cannot get response") c.log.WithError(err).Error("Cannot get response")
err = ErrAPINotReachable err = ErrAPINotReachable
c.cm.noConnection()
} }
return return
} }
@ -405,6 +407,12 @@ func (c *client) doJSONBuffered(req *http.Request, reqBodyBuffer []byte, data in
} }
return c.doJSONBuffered(req, reqBodyBuffer, data) return c.doJSONBuffered(req, reqBodyBuffer, data)
} }
if errCode.Err() == ErrAPINotReachable {
c.cm.noConnection()
}
if errCode.Err() == ErrUpgradeApplication {
c.cm.config.UpgradeApplicationHandler()
}
} }
if err := json.Unmarshal(resBody, data); err != nil { if err := json.Unmarshal(resBody, data); err != nil {

View File

@ -31,7 +31,7 @@ import (
const maxLogoutRetries = 5 const maxLogoutRetries = 5
// ClientManager is a manager of clients. // ClientManager is a manager of clients.
type ClientManager struct { type ClientManager struct { //nolint[maligned]
// newClient is used to create new Clients. By default this creates pmapi clients but it can be overridden to // newClient is used to create new Clients. By default this creates pmapi clients but it can be overridden to
// create other types of clients (e.g. for integration tests). // create other types of clients (e.g. for integration tests).
newClient func(userID string) Client newClient func(userID string) Client
@ -61,6 +61,8 @@ type ClientManager struct {
idGen idGen idGen idGen
connectionOff bool
log *logrus.Entry log *logrus.Entry
} }
@ -108,6 +110,8 @@ func NewClientManager(config *ClientConfig) (cm *ClientManager) {
proxyProvider: newProxyProvider(dohProviders, proxyQuery), proxyProvider: newProxyProvider(dohProviders, proxyQuery),
proxyUseDuration: proxyUseDuration, proxyUseDuration: proxyUseDuration,
connectionOff: false,
log: logrus.WithField("pkg", "pmapi-manager"), log: logrus.WithField("pkg", "pmapi-manager"),
} }
@ -121,6 +125,30 @@ func NewClientManager(config *ClientConfig) (cm *ClientManager) {
return cm return cm
} }
func (cm *ClientManager) noConnection() {
cm.log.Trace("No connection available")
if cm.connectionOff {
return
}
cm.log.Warn("Connection lost")
cm.config.ConnectionOffHandler()
cm.connectionOff = true
go func() {
for {
time.Sleep(30 * time.Second)
if err := cm.CheckConnection(); err == nil {
cm.log.Info("Connection re-established")
cm.config.ConnectionOnHandler()
cm.connectionOff = false
return
}
}
}()
}
// SetClientConstructor sets the method used to construct clients. // SetClientConstructor sets the method used to construct clients.
// By default this is `pmapi.newClient` but can be overridden with this method. // By default this is `pmapi.newClient` but can be overridden with this method.
func (cm *ClientManager) SetClientConstructor(f func(userID string) Client) { func (cm *ClientManager) SetClientConstructor(f func(userID string) Client) {

View File

@ -37,17 +37,7 @@ func NewProxyTLSDialer(dialer TLSDialer, cm *ClientManager) *ProxyTLSDialer {
} }
// DialTLS dials the given network/address. If it fails, it retries using a proxy. // DialTLS dials the given network/address. If it fails, it retries using a proxy.
func (d *ProxyTLSDialer) DialTLS(network, address string) (net.Conn, error) { func (d *ProxyTLSDialer) DialTLS(network, address string) (conn net.Conn, err error) {
conn, err := d.dialTLS(network, address)
if err != nil {
d.cm.config.NoConnectionHandler()
} else {
d.cm.config.ConnectionHandler()
}
return conn, err
}
func (d *ProxyTLSDialer) dialTLS(network, address string) (conn net.Conn, err error) {
if conn, err = d.dialer.DialTLS(network, address); err == nil { if conn, err = d.dialer.DialTLS(network, address); err == nil {
return return
} }

View File

@ -59,7 +59,7 @@ func (ctx *TestContext) withIMAPServer() {
tls, _ := tls.New(settingsPath).GetConfig() tls, _ := tls.New(settingsPath).GetConfig()
backend := imap.NewIMAPBackend(ph, ctx.listener, ctx.cache, ctx.bridge) backend := imap.NewIMAPBackend(ph, ctx.listener, ctx.cache, ctx.bridge)
server := imap.NewIMAPServer(true, true, port, tls, backend, ctx.listener) server := imap.NewIMAPServer(ph, true, true, port, tls, backend, ctx.listener)
go server.ListenAndServe() go server.ListenAndServe()
require.NoError(ctx.t, waitForPort(port, 5*time.Second)) require.NoError(ctx.t, waitForPort(port, 5*time.Second))