mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-18 16:17:03 +00:00
fix(GODT-2822): retry 429 for metadata and exponential cooldown GODT-2823.
This commit is contained in:
62
internal/user/cooldown.go
Normal file
62
internal/user/cooldown.go
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
// Copyright (c) 2023 Proton AG
|
||||||
|
//
|
||||||
|
// This file is part of Proton Mail Bridge.
|
||||||
|
//
|
||||||
|
// Proton Mail Bridge is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// Proton Mail Bridge is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package user
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const SyncRetryCooldown = 20 * time.Second
|
||||||
|
|
||||||
|
type cooldownProvider interface {
|
||||||
|
GetNextWaitTime() time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func jitter(max int) time.Duration {
|
||||||
|
return time.Duration(rand.Intn(max)) * time.Second //nolint:gosec
|
||||||
|
}
|
||||||
|
|
||||||
|
type expCooldown struct {
|
||||||
|
count int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *expCooldown) GetNextWaitTime() time.Duration {
|
||||||
|
waitTimes := []time.Duration{
|
||||||
|
20 * time.Second,
|
||||||
|
40 * time.Second,
|
||||||
|
80 * time.Second,
|
||||||
|
160 * time.Second,
|
||||||
|
300 * time.Second,
|
||||||
|
600 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
last := len(waitTimes) - 1
|
||||||
|
|
||||||
|
if c.count >= last {
|
||||||
|
return waitTimes[last] + jitter(10)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.count++
|
||||||
|
|
||||||
|
return waitTimes[c.count-1] + jitter(10)
|
||||||
|
}
|
||||||
|
|
||||||
|
type noCooldown struct{}
|
||||||
|
|
||||||
|
func (c *noCooldown) GetNextWaitTime() time.Duration { return time.Millisecond }
|
||||||
@ -439,7 +439,18 @@ func (user *User) syncMessages(
|
|||||||
metadataChunks := xslices.Chunk(messageIDs, MetadataDataPageSize)
|
metadataChunks := xslices.Chunk(messageIDs, MetadataDataPageSize)
|
||||||
for i, metadataChunk := range metadataChunks {
|
for i, metadataChunk := range metadataChunks {
|
||||||
logrus.Debugf("Metadata Request (%v of %v), previous: %v", i, len(metadataChunks), len(downloadReq.ids))
|
logrus.Debugf("Metadata Request (%v of %v), previous: %v", i, len(metadataChunks), len(downloadReq.ids))
|
||||||
|
|
||||||
|
cooldown := expCooldown{}
|
||||||
|
|
||||||
metadata, err := client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk})
|
metadata, err := client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk})
|
||||||
|
for is429Error(err) {
|
||||||
|
wait := cooldown.GetNextWaitTime()
|
||||||
|
logrus.WithError(err).Warnf("Retrying in %v to download message metadata for chunk %v", wait, i)
|
||||||
|
sleepCtx(ctx, wait)
|
||||||
|
|
||||||
|
metadata, err = client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk})
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to download message metadata for chunk %v", i)
|
logrus.WithError(err).Errorf("Failed to download message metadata for chunk %v", i)
|
||||||
downloadReq.err = err
|
downloadReq.err = err
|
||||||
|
|||||||
@ -103,7 +103,7 @@ func startSyncDownloader(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
batch, err := downloadMessagesStage2(ctx, result, downloader, cache, SyncRetryCooldown)
|
batch, err := downloadMessagesStage2(ctx, result, downloader, cache, &expCooldown{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorCh <- err
|
errorCh <- err
|
||||||
return
|
return
|
||||||
@ -281,7 +281,7 @@ func downloadMessagesStage2(
|
|||||||
state []downloadResult,
|
state []downloadResult,
|
||||||
downloader MessageDownloader,
|
downloader MessageDownloader,
|
||||||
cache *SyncDownloadCache,
|
cache *SyncDownloadCache,
|
||||||
coolDown time.Duration,
|
cooldown cooldownProvider,
|
||||||
) ([]proton.FullMessage, error) {
|
) ([]proton.FullMessage, error) {
|
||||||
logrus.Debug("Entering download stage 2")
|
logrus.Debug("Entering download stage 2")
|
||||||
var retryList []int
|
var retryList []int
|
||||||
@ -289,7 +289,7 @@ func downloadMessagesStage2(
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
if shouldWaitBeforeRetry {
|
if shouldWaitBeforeRetry {
|
||||||
time.Sleep(coolDown)
|
time.Sleep(cooldown.GetNextWaitTime())
|
||||||
}
|
}
|
||||||
|
|
||||||
retryList = nil
|
retryList = nil
|
||||||
|
|||||||
@ -23,7 +23,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ProtonMail/gluon/async"
|
"github.com/ProtonMail/gluon/async"
|
||||||
"github.com/ProtonMail/go-proton-api"
|
"github.com/ProtonMail/go-proton-api"
|
||||||
@ -143,7 +142,7 @@ func TestSyncDownloader_Stage2_Everything200(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond)
|
result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, len(result))
|
require.Equal(t, 2, len(result))
|
||||||
}
|
}
|
||||||
@ -171,7 +170,7 @@ func TestSyncDownloader_Stage2_Not429(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond)
|
_, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, msgErr, err)
|
require.Equal(t, msgErr, err)
|
||||||
}
|
}
|
||||||
@ -195,7 +194,7 @@ func TestSyncDownloader_Stage2_API500(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond)
|
_, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, msgErr, err)
|
require.Equal(t, msgErr, err)
|
||||||
}
|
}
|
||||||
@ -303,7 +302,7 @@ func TestSyncDownloader_Stage2_Some429(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond)
|
messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 3, len(messages))
|
require.Equal(t, 3, len(messages))
|
||||||
|
|
||||||
@ -381,7 +380,7 @@ func TestSyncDownloader_Stage2_ErrorOnNon429MessageDownload(t *testing.T) {
|
|||||||
messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).Times(1).Return(proton.Message{}, err500)
|
messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).Times(1).Return(proton.Message{}, err500)
|
||||||
}
|
}
|
||||||
|
|
||||||
messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond)
|
messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Empty(t, 0, messages)
|
require.Empty(t, 0, messages)
|
||||||
}
|
}
|
||||||
@ -424,7 +423,7 @@ func TestSyncDownloader_Stage2_ErrorOnNon429AttachmentDownload(t *testing.T) {
|
|||||||
// 500 for second attachment
|
// 500 for second attachment
|
||||||
messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A4"), gomock.Any()).Times(1).Return(err500)
|
messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A4"), gomock.Any()).Times(1).Return(err500)
|
||||||
|
|
||||||
messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond)
|
messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Empty(t, 0, messages)
|
require.Empty(t, 0, messages)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -55,10 +55,6 @@ var (
|
|||||||
EventJitter = 20 * time.Second // nolint:gochecknoglobals,revive
|
EventJitter = 20 * time.Second // nolint:gochecknoglobals,revive
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
SyncRetryCooldown = 20 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
type User struct {
|
type User struct {
|
||||||
log *logrus.Entry
|
log *logrus.Entry
|
||||||
|
|
||||||
@ -244,13 +240,16 @@ func New(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cooldown := expCooldown{}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if err := ctx.Err(); err != nil {
|
if err := ctx.Err(); err != nil {
|
||||||
user.log.WithError(err).Error("Sync aborted")
|
user.log.WithError(err).Error("Sync aborted")
|
||||||
return
|
return
|
||||||
} else if err := user.doSync(ctx); err != nil {
|
} else if err := user.doSync(ctx); err != nil {
|
||||||
user.log.WithError(err).Error("Failed to sync, will retry later")
|
wait := cooldown.GetNextWaitTime()
|
||||||
sleepCtx(ctx, SyncRetryCooldown)
|
user.log.WithField("retry-after", wait).WithError(err).Error("Failed to sync, will retry later")
|
||||||
|
sleepCtx(ctx, wait)
|
||||||
} else {
|
} else {
|
||||||
user.log.Info("Sync complete, starting API event stream")
|
user.log.Info("Sync complete, starting API event stream")
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user