forked from Silverfish/proton-bridge
Pause event loop while FETCHing to prevetn EXPUNGE
This commit is contained in:
@ -40,6 +40,8 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/)
|
|||||||
* GODT-454 Fix send on closed channel when receiving unencrypted send confirmation from GUI.
|
* GODT-454 Fix send on closed channel when receiving unencrypted send confirmation from GUI.
|
||||||
* GODT-597 Duplicate sending when draft creation takes too long
|
* GODT-597 Duplicate sending when draft creation takes too long
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
* GODT-462 Pausing event loop while FETCHing to prevent EXPUNGE
|
||||||
|
|
||||||
## [v1.3.x] Emma (v1.3.2 beta 2020-08-04, v1.3.3 beta 2020-08-06, v1.3.3 live 2020-08-12)
|
## [v1.3.x] Emma (v1.3.2 beta 2020-08-04, v1.3.3 beta 2020-08-06, v1.3.3 live 2020-08-12)
|
||||||
|
|
||||||
|
|||||||
@ -383,6 +383,12 @@ func (im *imapMailbox) ListMessages(isUID bool, seqSet *imap.SeqSet, items []ima
|
|||||||
im.panicHandler.HandlePanic()
|
im.panicHandler.HandlePanic()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// EXPUNGE cannot be sent during listing and can come only from
|
||||||
|
// the event loop, so we prevent any server side update to avoid
|
||||||
|
// the problem.
|
||||||
|
im.storeUser.PauseEventLoop(true)
|
||||||
|
defer im.storeUser.PauseEventLoop(false)
|
||||||
|
|
||||||
var markAsReadIDs []string
|
var markAsReadIDs []string
|
||||||
markAsReadMutex := &sync.Mutex{}
|
markAsReadMutex := &sync.Mutex{}
|
||||||
|
|
||||||
|
|||||||
@ -41,6 +41,8 @@ type storeUserProvider interface {
|
|||||||
attachedPublicKey,
|
attachedPublicKey,
|
||||||
attachedPublicKeyName string,
|
attachedPublicKeyName string,
|
||||||
parentID string) (*pmapi.Message, []*pmapi.Attachment, error)
|
parentID string) (*pmapi.Message, []*pmapi.Attachment, error)
|
||||||
|
|
||||||
|
PauseEventLoop(bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
type storeAddressProvider interface {
|
type storeAddressProvider interface {
|
||||||
|
|||||||
@ -38,7 +38,8 @@ type eventLoop struct {
|
|||||||
pollCh chan chan struct{}
|
pollCh chan chan struct{}
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
notifyStopCh chan struct{}
|
notifyStopCh chan struct{}
|
||||||
isRunning bool
|
isRunning bool // The whole event loop is running.
|
||||||
|
isTickerPaused bool // The periodic loop is paused (but the event loop itself is still running).
|
||||||
hasInternet bool
|
hasInternet bool
|
||||||
|
|
||||||
pollCounter int
|
pollCounter int
|
||||||
@ -59,6 +60,7 @@ func newEventLoop(cache *Cache, store *Store, user BridgeUser, events listener.L
|
|||||||
currentEventID: cache.getEventID(user.ID()),
|
currentEventID: cache.getEventID(user.ID()),
|
||||||
pollCh: make(chan chan struct{}),
|
pollCh: make(chan chan struct{}),
|
||||||
isRunning: false,
|
isRunning: false,
|
||||||
|
isTickerPaused: false,
|
||||||
|
|
||||||
log: eventLog,
|
log: eventLog,
|
||||||
|
|
||||||
@ -68,10 +70,6 @@ func newEventLoop(cache *Cache, store *Store, user BridgeUser, events listener.L
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (loop *eventLoop) IsRunning() bool {
|
|
||||||
return loop.isRunning
|
|
||||||
}
|
|
||||||
|
|
||||||
func (loop *eventLoop) client() pmapi.Client {
|
func (loop *eventLoop) client() pmapi.Client {
|
||||||
return loop.store.client()
|
return loop.store.client()
|
||||||
}
|
}
|
||||||
@ -156,6 +154,10 @@ func (loop *eventLoop) loop() {
|
|||||||
close(loop.notifyStopCh)
|
close(loop.notifyStopCh)
|
||||||
return
|
return
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
|
if loop.isTickerPaused {
|
||||||
|
loop.log.Trace("Event loop paused, skipping")
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Randomise periodic calls within range pollInterval ± pollSpread to reduces potential load spikes on API.
|
// Randomise periodic calls within range pollInterval ± pollSpread to reduces potential load spikes on API.
|
||||||
time.Sleep(time.Duration(rand.Intn(2*int(pollIntervalSpread.Milliseconds()))) * time.Millisecond)
|
time.Sleep(time.Duration(rand.Intn(2*int(pollIntervalSpread.Milliseconds()))) * time.Millisecond)
|
||||||
case eventProcessedCh = <-loop.pollCh:
|
case eventProcessedCh = <-loop.pollCh:
|
||||||
|
|||||||
@ -348,6 +348,18 @@ func (store *Store) addAddress(address, addressID string, labels []*pmapi.Label)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PauseEventLoop sets whether the ticker is periodically polling or not.
|
||||||
|
func (store *Store) PauseEventLoop(pause bool) {
|
||||||
|
store.lock.Lock()
|
||||||
|
defer store.lock.Unlock()
|
||||||
|
|
||||||
|
store.log.WithField("pause", pause).Info("Pausing event loop")
|
||||||
|
|
||||||
|
if store.eventLoop != nil {
|
||||||
|
store.eventLoop.isTickerPaused = pause
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Close stops the event loop and closes the database to free the file.
|
// Close stops the event loop and closes the database to free the file.
|
||||||
func (store *Store) Close() error {
|
func (store *Store) Close() error {
|
||||||
store.lock.Lock()
|
store.lock.Lock()
|
||||||
|
|||||||
@ -26,6 +26,10 @@ import (
|
|||||||
bolt "go.etcd.io/bbolt"
|
bolt "go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (loop *eventLoop) IsRunning() bool {
|
||||||
|
return loop.isRunning
|
||||||
|
}
|
||||||
|
|
||||||
// TestSync triggers a sync of the store.
|
// TestSync triggers a sync of the store.
|
||||||
func (store *Store) TestSync() {
|
func (store *Store) TestSync() {
|
||||||
store.lock.Lock()
|
store.lock.Lock()
|
||||||
|
|||||||
Reference in New Issue
Block a user