mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-11 05:06:51 +00:00
Reduce number of synchronizations GODT-313
* [x] expononential cooldown of retries * [x] do not trigger sync by counts * [x] randomization of event poll interval
This commit is contained in:
@ -10,6 +10,10 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/)
|
|||||||
* More logs about event loop activity
|
* More logs about event loop activity
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
* GODT-313 Reduce number of synchronizations
|
||||||
|
* do not trigger sync by counts
|
||||||
|
* cooldown timer for sync retries
|
||||||
|
* poll interval randomization
|
||||||
* GODT-225 Do not send an EXISTS reposnse after EXPUNGE or when nothing changed (fixes rebuild of mailboxes in Outlook for Mac)
|
* GODT-225 Do not send an EXISTS reposnse after EXPUNGE or when nothing changed (fixes rebuild of mailboxes in Outlook for Mac)
|
||||||
* GODT-165 Optimization of RebuildMailboxes
|
* GODT-165 Optimization of RebuildMailboxes
|
||||||
* GODT-282 Completely delete old draft instead moving to trash when user updates draft
|
* GODT-282 Completely delete old draft instead moving to trash when user updates draft
|
||||||
|
|||||||
@ -359,7 +359,7 @@ func migratePreferencesFromC10(cfg *config.Config) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ioutil.WriteFile(pref11Path, data, 0644)
|
err = ioutil.WriteFile(pref11Path, data, 0644) //nolint[gosec]
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Problem to migrate preferences")
|
log.WithError(err).Error("Problem to migrate preferences")
|
||||||
return
|
return
|
||||||
|
|||||||
65
internal/store/cooldown.go
Normal file
65
internal/store/cooldown.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
// Copyright (c) 2020 Proton Technologies AG
|
||||||
|
//
|
||||||
|
// This file is part of ProtonMail Bridge.
|
||||||
|
//
|
||||||
|
// ProtonMail Bridge is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// ProtonMail Bridge is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with ProtonMail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type cooldown struct {
|
||||||
|
waitTimes []time.Duration
|
||||||
|
waitIndex int
|
||||||
|
lastTry time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cooldown) setExponentialWait(initial time.Duration, base int, maximum time.Duration) {
|
||||||
|
waitTimes := []time.Duration{}
|
||||||
|
t := initial
|
||||||
|
if base > 1 {
|
||||||
|
for t < maximum {
|
||||||
|
waitTimes = append(waitTimes, t)
|
||||||
|
t *= time.Duration(base)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
waitTimes = append(waitTimes, maximum)
|
||||||
|
c.setWaitTimes(waitTimes...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cooldown) setWaitTimes(newTimes ...time.Duration) {
|
||||||
|
c.waitTimes = newTimes
|
||||||
|
c.reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
// isTooSoon™ returns whether the cooldown period is not yet over.
|
||||||
|
func (c *cooldown) isTooSoon() bool {
|
||||||
|
if time.Since(c.lastTry) < c.waitTimes[c.waitIndex] {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
c.lastTry = time.Now()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cooldown) increaseWaitTime() {
|
||||||
|
c.lastTry = time.Now()
|
||||||
|
if c.waitIndex+1 < len(c.waitTimes) {
|
||||||
|
c.waitIndex++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cooldown) reset() {
|
||||||
|
c.waitIndex = 0
|
||||||
|
c.lastTry = time.Time{}
|
||||||
|
}
|
||||||
133
internal/store/cooldown_test.go
Normal file
133
internal/store/cooldown_test.go
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
// Copyright (c) 2020 Proton Technologies AG
|
||||||
|
//
|
||||||
|
// This file is part of ProtonMail Bridge.
|
||||||
|
//
|
||||||
|
// ProtonMail Bridge is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// ProtonMail Bridge is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with ProtonMail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCooldownExponentialWait(t *testing.T) {
|
||||||
|
ms := time.Millisecond
|
||||||
|
sec := time.Second
|
||||||
|
|
||||||
|
testData := []struct {
|
||||||
|
haveInitial, haveMax time.Duration
|
||||||
|
haveBase int
|
||||||
|
wantWaitTimes []time.Duration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
haveInitial: 1 * sec,
|
||||||
|
haveBase: 0,
|
||||||
|
haveMax: 0 * sec,
|
||||||
|
wantWaitTimes: []time.Duration{0 * sec},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
haveInitial: 0 * sec,
|
||||||
|
haveBase: 1,
|
||||||
|
haveMax: 0 * sec,
|
||||||
|
wantWaitTimes: []time.Duration{0 * sec},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
haveInitial: 0 * sec,
|
||||||
|
haveBase: 0,
|
||||||
|
haveMax: 1 * sec,
|
||||||
|
wantWaitTimes: []time.Duration{1 * sec},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
haveInitial: 0 * sec,
|
||||||
|
haveBase: 1,
|
||||||
|
haveMax: 1 * sec,
|
||||||
|
wantWaitTimes: []time.Duration{1 * sec},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
haveInitial: 1 * sec,
|
||||||
|
haveBase: 0,
|
||||||
|
haveMax: 1 * sec,
|
||||||
|
wantWaitTimes: []time.Duration{1 * sec},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
haveInitial: 1 * sec,
|
||||||
|
haveBase: 2,
|
||||||
|
haveMax: 1 * sec,
|
||||||
|
wantWaitTimes: []time.Duration{1 * sec},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
haveInitial: 500 * ms,
|
||||||
|
haveBase: 2,
|
||||||
|
haveMax: 5 * sec,
|
||||||
|
wantWaitTimes: []time.Duration{500 * ms, 1 * sec, 2 * sec, 4 * sec, 5 * sec},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var testCooldown cooldown
|
||||||
|
|
||||||
|
for _, td := range testData {
|
||||||
|
testCooldown.setExponentialWait(td.haveInitial, td.haveBase, td.haveMax)
|
||||||
|
assert.Equal(t, td.wantWaitTimes, testCooldown.waitTimes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCooldownIncreaseAndReset(t *testing.T) {
|
||||||
|
var testCooldown cooldown
|
||||||
|
testCooldown.setWaitTimes(1*time.Second, 2*time.Second, 3*time.Second)
|
||||||
|
assert.Equal(t, 0, testCooldown.waitIndex)
|
||||||
|
|
||||||
|
assert.False(t, testCooldown.isTooSoon())
|
||||||
|
assert.True(t, testCooldown.isTooSoon())
|
||||||
|
assert.Equal(t, 0, testCooldown.waitIndex)
|
||||||
|
|
||||||
|
testCooldown.reset()
|
||||||
|
assert.Equal(t, 0, testCooldown.waitIndex)
|
||||||
|
|
||||||
|
assert.False(t, testCooldown.isTooSoon())
|
||||||
|
assert.True(t, testCooldown.isTooSoon())
|
||||||
|
assert.Equal(t, 0, testCooldown.waitIndex)
|
||||||
|
|
||||||
|
// increase at least N+1 times to check overflow
|
||||||
|
testCooldown.increaseWaitTime()
|
||||||
|
assert.True(t, testCooldown.isTooSoon())
|
||||||
|
testCooldown.increaseWaitTime()
|
||||||
|
assert.True(t, testCooldown.isTooSoon())
|
||||||
|
testCooldown.increaseWaitTime()
|
||||||
|
assert.True(t, testCooldown.isTooSoon())
|
||||||
|
testCooldown.increaseWaitTime()
|
||||||
|
assert.True(t, testCooldown.isTooSoon())
|
||||||
|
|
||||||
|
assert.Equal(t, 2, testCooldown.waitIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCooldownNotSooner(t *testing.T) {
|
||||||
|
var testCooldown cooldown
|
||||||
|
waitTime := 100 * time.Millisecond
|
||||||
|
retries := int64(10)
|
||||||
|
retryWait := time.Duration(waitTime.Milliseconds()/retries) * time.Millisecond
|
||||||
|
testCooldown.setWaitTimes(waitTime)
|
||||||
|
|
||||||
|
// first time it should never be too soon
|
||||||
|
assert.False(t, testCooldown.isTooSoon())
|
||||||
|
// these retries should be too soon
|
||||||
|
for i := retries; i > 0; i-- {
|
||||||
|
assert.True(t, testCooldown.isTooSoon())
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
}
|
||||||
|
// after given wait time it shouldn't be soon anymore
|
||||||
|
assert.False(t, testCooldown.isTooSoon())
|
||||||
|
}
|
||||||
@ -18,6 +18,7 @@
|
|||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
bridgeEvents "github.com/ProtonMail/proton-bridge/internal/events"
|
bridgeEvents "github.com/ProtonMail/proton-bridge/internal/events"
|
||||||
@ -28,6 +29,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const pollInterval = 30 * time.Second
|
const pollInterval = 30 * time.Second
|
||||||
|
const pollIntervalSpread = 5 * time.Second
|
||||||
|
|
||||||
type eventLoop struct {
|
type eventLoop struct {
|
||||||
cache *Cache
|
cache *Cache
|
||||||
@ -132,7 +134,7 @@ func (loop *eventLoop) start() { // nolint[funlen]
|
|||||||
loop.log.WithField("lastEventID", loop.currentEventID).Warn("Subscription stopped")
|
loop.log.WithField("lastEventID", loop.currentEventID).Warn("Subscription stopped")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
t := time.NewTicker(pollInterval)
|
t := time.NewTicker(pollInterval - pollIntervalSpread)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
loop.hasInternet = true
|
loop.hasInternet = true
|
||||||
@ -145,8 +147,11 @@ func (loop *eventLoop) start() { // nolint[funlen]
|
|||||||
case <-loop.stopCh:
|
case <-loop.stopCh:
|
||||||
close(loop.notifyStopCh)
|
close(loop.notifyStopCh)
|
||||||
return
|
return
|
||||||
case eventProcessedCh = <-loop.pollCh:
|
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
|
// Randomise periodic calls within range pollInterval ± pollSpread to reduces potential load spikes on API.
|
||||||
|
time.Sleep(time.Duration(rand.Intn(2*int(pollIntervalSpread.Milliseconds()))) * time.Millisecond)
|
||||||
|
case eventProcessedCh = <-loop.pollCh:
|
||||||
|
// We don't want to wait here. Polling should happen instantly.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Before we fetch the first event, check whether this is the first time we've
|
// Before we fetch the first event, check whether this is the first time we've
|
||||||
@ -545,7 +550,7 @@ func (loop *eventLoop) processMessageCounts(l *logrus.Entry, messageCounts []*pm
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !isSynced {
|
if !isSynced {
|
||||||
loop.store.triggerSync()
|
log.Error("The counts between DB and API are not matching")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -64,7 +64,7 @@ func TestEventLoopProcessMoreEvents(t *testing.T) {
|
|||||||
}, time.Second, 10*time.Millisecond)
|
}, time.Second, 10*time.Millisecond)
|
||||||
|
|
||||||
// For normal event we need to wait to next polling.
|
// For normal event we need to wait to next polling.
|
||||||
time.Sleep(pollInterval)
|
time.Sleep(pollInterval + pollIntervalSpread)
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
return m.store.eventLoop.currentEventID == "event71"
|
return m.store.eventLoop.currentEventID == "event71"
|
||||||
}, time.Second, 10*time.Millisecond)
|
}, time.Second, 10*time.Millisecond)
|
||||||
|
|||||||
@ -104,6 +104,7 @@ type Store struct {
|
|||||||
imapUpdates chan interface{}
|
imapUpdates chan interface{}
|
||||||
|
|
||||||
isSyncRunning bool
|
isSyncRunning bool
|
||||||
|
syncCooldown cooldown
|
||||||
addressMode addressMode
|
addressMode addressMode
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,6 +149,9 @@ func New(
|
|||||||
log: l,
|
log: l,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Minimal increase is event pollInterval, doubles every failed retry up to 5 minutes.
|
||||||
|
store.syncCooldown.setExponentialWait(pollInterval, 2, 5*time.Minute)
|
||||||
|
|
||||||
if err = store.init(firstInit); err != nil {
|
if err = store.init(firstInit); err != nil {
|
||||||
l.WithError(err).Error("Could not initialise store, attempting to close")
|
l.WithError(err).Error("Could not initialise store, attempting to close")
|
||||||
if storeCloseErr := store.Close(); storeCloseErr != nil {
|
if storeCloseErr := store.Close(); storeCloseErr != nil {
|
||||||
|
|||||||
@ -128,11 +128,19 @@ func (store *Store) triggerSync() {
|
|||||||
store.log.Debug("Store sync triggered")
|
store.log.Debug("Store sync triggered")
|
||||||
|
|
||||||
store.lock.Lock()
|
store.lock.Lock()
|
||||||
|
|
||||||
if store.isSyncRunning {
|
if store.isSyncRunning {
|
||||||
store.lock.Unlock()
|
store.lock.Unlock()
|
||||||
store.log.Info("Store sync is already ongoing")
|
store.log.Info("Store sync is already ongoing")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if store.syncCooldown.isTooSoon() {
|
||||||
|
store.lock.Unlock()
|
||||||
|
store.log.Info("Skipping sync: store tries to resync too often")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
store.isSyncRunning = true
|
store.isSyncRunning = true
|
||||||
store.lock.Unlock()
|
store.lock.Unlock()
|
||||||
|
|
||||||
@ -147,9 +155,11 @@ func (store *Store) triggerSync() {
|
|||||||
err := syncAllMail(store.panicHandler, store, store.api, syncState)
|
err := syncAllMail(store.panicHandler, store, store.api, syncState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Store sync failed")
|
log.WithError(err).Error("Store sync failed")
|
||||||
|
store.syncCooldown.increaseWaitTime()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
store.syncCooldown.reset()
|
||||||
syncState.setFinishTime()
|
syncState.setFinishTime()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -103,7 +103,7 @@ func (u *Updates) CreateJSONAndSign(deployDir, goos string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = ioutil.WriteFile(versionFilePath, txt, 0644); err != nil {
|
if err = ioutil.WriteFile(versionFilePath, txt, 0644); err != nil { //nolint[gosec]
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user