diff --git a/internal/user/cooldown.go b/internal/user/cooldown.go new file mode 100644 index 00000000..a0fd389a --- /dev/null +++ b/internal/user/cooldown.go @@ -0,0 +1,62 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package user + +import ( + "math/rand" + "time" +) + +const SyncRetryCooldown = 20 * time.Second + +type cooldownProvider interface { + GetNextWaitTime() time.Duration +} + +func jitter(max int) time.Duration { + return time.Duration(rand.Intn(max)) * time.Second //nolint:gosec +} + +type expCooldown struct { + count int +} + +func (c *expCooldown) GetNextWaitTime() time.Duration { + waitTimes := []time.Duration{ + 20 * time.Second, + 40 * time.Second, + 80 * time.Second, + 160 * time.Second, + 300 * time.Second, + 600 * time.Second, + } + + last := len(waitTimes) - 1 + + if c.count >= last { + return waitTimes[last] + jitter(10) + } + + c.count++ + + return waitTimes[c.count-1] + jitter(10) +} + +type noCooldown struct{} + +func (c *noCooldown) GetNextWaitTime() time.Duration { return time.Millisecond } diff --git a/internal/user/sync.go b/internal/user/sync.go index 717a677a..f224f1d4 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -439,7 +439,18 @@ func (user *User) syncMessages( metadataChunks := xslices.Chunk(messageIDs, MetadataDataPageSize) for i, metadataChunk := range metadataChunks { logrus.Debugf("Metadata Request (%v of %v), previous: %v", i, len(metadataChunks), len(downloadReq.ids)) + + cooldown := expCooldown{} + metadata, err := client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk}) + for is429Error(err) { + wait := cooldown.GetNextWaitTime() + logrus.WithError(err).Warnf("Retrying in %v to download message metadata for chunk %v", wait, i) + sleepCtx(ctx, wait) + + metadata, err = client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk}) + } + if err != nil { logrus.WithError(err).Errorf("Failed to download message metadata for chunk %v", i) downloadReq.err = err diff --git a/internal/user/sync_downloader.go b/internal/user/sync_downloader.go index e84450bb..0084ab73 100644 --- a/internal/user/sync_downloader.go +++ b/internal/user/sync_downloader.go @@ -103,7 +103,7 @@ func startSyncDownloader( return } - batch, err := downloadMessagesStage2(ctx, result, downloader, cache, SyncRetryCooldown) + batch, err := downloadMessagesStage2(ctx, result, downloader, cache, &expCooldown{}) if err != nil { errorCh <- err return @@ -281,7 +281,7 @@ func downloadMessagesStage2( state []downloadResult, downloader MessageDownloader, cache *SyncDownloadCache, - coolDown time.Duration, + cooldown cooldownProvider, ) ([]proton.FullMessage, error) { logrus.Debug("Entering download stage 2") var retryList []int @@ -289,7 +289,7 @@ func downloadMessagesStage2( for { if shouldWaitBeforeRetry { - time.Sleep(coolDown) + time.Sleep(cooldown.GetNextWaitTime()) } retryList = nil diff --git a/internal/user/sync_downloader_test.go b/internal/user/sync_downloader_test.go index c03d6061..76a9b6a4 100644 --- a/internal/user/sync_downloader_test.go +++ b/internal/user/sync_downloader_test.go @@ -23,7 +23,6 @@ import ( "io" "strings" "testing" - "time" "github.com/ProtonMail/gluon/async" "github.com/ProtonMail/go-proton-api" @@ -143,7 +142,7 @@ func TestSyncDownloader_Stage2_Everything200(t *testing.T) { }, } - result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.NoError(t, err) require.Equal(t, 2, len(result)) } @@ -171,7 +170,7 @@ func TestSyncDownloader_Stage2_Not429(t *testing.T) { }, } - _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Equal(t, msgErr, err) } @@ -195,7 +194,7 @@ func TestSyncDownloader_Stage2_API500(t *testing.T) { }, } - _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Equal(t, msgErr, err) } @@ -303,7 +302,7 @@ func TestSyncDownloader_Stage2_Some429(t *testing.T) { }) } - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.NoError(t, err) require.Equal(t, 3, len(messages)) @@ -381,7 +380,7 @@ func TestSyncDownloader_Stage2_ErrorOnNon429MessageDownload(t *testing.T) { messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).Times(1).Return(proton.Message{}, err500) } - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Empty(t, 0, messages) } @@ -424,7 +423,7 @@ func TestSyncDownloader_Stage2_ErrorOnNon429AttachmentDownload(t *testing.T) { // 500 for second attachment messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A4"), gomock.Any()).Times(1).Return(err500) - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Empty(t, 0, messages) } diff --git a/internal/user/user.go b/internal/user/user.go index 0399fba1..6ae06287 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -55,10 +55,6 @@ var ( EventJitter = 20 * time.Second // nolint:gochecknoglobals,revive ) -const ( - SyncRetryCooldown = 20 * time.Second -) - type User struct { log *logrus.Entry @@ -244,13 +240,16 @@ func New( return } + cooldown := expCooldown{} + for { if err := ctx.Err(); err != nil { user.log.WithError(err).Error("Sync aborted") return } else if err := user.doSync(ctx); err != nil { - user.log.WithError(err).Error("Failed to sync, will retry later") - sleepCtx(ctx, SyncRetryCooldown) + wait := cooldown.GetNextWaitTime() + user.log.WithField("retry-after", wait).WithError(err).Error("Failed to sync, will retry later") + sleepCtx(ctx, wait) } else { user.log.Info("Sync complete, starting API event stream") return