From 5136919c369affdf60c792e5681d4f060e0ee269 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Wed, 26 Jul 2023 14:54:58 +0200 Subject: [PATCH 01/15] fix(GODT-2822): Handle 429 during message download When we run into 429 during a message download, do not cancel the whole batch and switch to a sequential downloader to avoid API overload. --- Makefile | 1 + internal/user/mocks/mocks.go | 66 +++++ internal/user/sync.go | 161 +---------- internal/user/sync_downloader.go | 339 ++++++++++++++++++++++ internal/user/sync_downloader_test.go | 400 ++++++++++++++++++++++++++ 5 files changed, 807 insertions(+), 160 deletions(-) create mode 100644 internal/user/mocks/mocks.go create mode 100644 internal/user/sync_downloader.go create mode 100644 internal/user/sync_downloader_test.go diff --git a/Makefile b/Makefile index c4156f84..fd200737 100644 --- a/Makefile +++ b/Makefile @@ -274,6 +274,7 @@ mocks: mockgen --package mocks github.com/ProtonMail/proton-bridge/v3/internal/updater Downloader,Installer > internal/updater/mocks/mocks.go mockgen --package mocks github.com/ProtonMail/proton-bridge/v3/internal/telemetry HeartbeatManager > internal/telemetry/mocks/mocks.go cp internal/telemetry/mocks/mocks.go internal/bridge/mocks/telemetry_mocks.go + mockgen --package mocks github.com/ProtonMail/proton-bridge/v3/internal/user MessageDownloader > internal/user/mocks/mocks.go lint: gofiles lint-golang lint-license lint-dependencies lint-changelog diff --git a/internal/user/mocks/mocks.go b/internal/user/mocks/mocks.go new file mode 100644 index 00000000..0d72426f --- /dev/null +++ b/internal/user/mocks/mocks.go @@ -0,0 +1,66 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ProtonMail/proton-bridge/v3/internal/user (interfaces: MessageDownloader) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + io "io" + reflect "reflect" + + proton "github.com/ProtonMail/go-proton-api" + gomock "github.com/golang/mock/gomock" +) + +// MockMessageDownloader is a mock of MessageDownloader interface. +type MockMessageDownloader struct { + ctrl *gomock.Controller + recorder *MockMessageDownloaderMockRecorder +} + +// MockMessageDownloaderMockRecorder is the mock recorder for MockMessageDownloader. +type MockMessageDownloaderMockRecorder struct { + mock *MockMessageDownloader +} + +// NewMockMessageDownloader creates a new mock instance. +func NewMockMessageDownloader(ctrl *gomock.Controller) *MockMessageDownloader { + mock := &MockMessageDownloader{ctrl: ctrl} + mock.recorder = &MockMessageDownloaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMessageDownloader) EXPECT() *MockMessageDownloaderMockRecorder { + return m.recorder +} + +// GetAttachmentInto mocks base method. +func (m *MockMessageDownloader) GetAttachmentInto(arg0 context.Context, arg1 string, arg2 io.ReaderFrom) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAttachmentInto", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetAttachmentInto indicates an expected call of GetAttachmentInto. +func (mr *MockMessageDownloaderMockRecorder) GetAttachmentInto(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAttachmentInto", reflect.TypeOf((*MockMessageDownloader)(nil).GetAttachmentInto), arg0, arg1, arg2) +} + +// GetMessage mocks base method. +func (m *MockMessageDownloader) GetMessage(arg0 context.Context, arg1 string) (proton.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMessage", arg0, arg1) + ret0, _ := ret[0].(proton.Message) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMessage indicates an expected call of GetMessage. +func (mr *MockMessageDownloaderMockRecorder) GetMessage(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMessage", reflect.TypeOf((*MockMessageDownloader)(nil).GetMessage), arg0, arg1) +} diff --git a/internal/user/sync.go b/internal/user/sync.go index 9fb59342..84cce7c6 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -378,32 +378,18 @@ func (user *User) syncMessages( batchLen int } - type downloadRequest struct { - ids []string - expectedSize uint64 - err error - } - - type downloadedMessageBatch struct { - batch []proton.FullMessage - } - type builtMessageBatch struct { batch []*buildRes } downloadCh := make(chan downloadRequest) - buildCh := make(chan downloadedMessageBatch) - // The higher this value, the longer we can continue our download iteration before being blocked on channel writes // to the update flushing goroutine. flushCh := make(chan builtMessageBatch) flushUpdateCh := make(chan flushUpdate) - errorCh := make(chan error, syncLimits.MaxParallelDownloads*4) - // Go routine in charge of downloading message metadata async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) { defer close(downloadCh) @@ -469,65 +455,7 @@ func (user *User) syncMessages( }, logging.Labels{"sync-stage": "meta-data"}) // Goroutine in charge of downloading and building messages in maxBatchSize batches. - async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) { - defer close(buildCh) - defer close(errorCh) - defer func() { - logrus.Debugf("sync downloader exit") - }() - - attachmentDownloader := user.newAttachmentDownloader(ctx, client, syncLimits.MaxParallelDownloads) - defer attachmentDownloader.close() - - for request := range downloadCh { - logrus.Debugf("Download request: %v MB:%v", len(request.ids), toMB(request.expectedSize)) - if request.err != nil { - errorCh <- request.err - return - } - - if ctx.Err() != nil { - errorCh <- ctx.Err() - return - } - - result, err := parallel.MapContext(ctx, syncLimits.MaxParallelDownloads, request.ids, func(ctx context.Context, id string) (proton.FullMessage, error) { - defer async.HandlePanic(user.panicHandler) - - var result proton.FullMessage - - msg, err := client.GetMessage(ctx, id) - if err != nil { - logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message") - return proton.FullMessage{}, err - } - - attachments, err := attachmentDownloader.getAttachments(ctx, msg.Attachments) - if err != nil { - logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message attachments") - return proton.FullMessage{}, err - } - - result.Message = msg - result.AttData = attachments - - return result, nil - }) - if err != nil { - errorCh <- err - return - } - - select { - case buildCh <- downloadedMessageBatch{ - batch: result, - }: - - case <-ctx.Done(): - return - } - } - }, logging.Labels{"sync-stage": "download"}) + buildCh, errorCh := startSyncDownloader(ctx, user.panicHandler, user.client, downloadCh, syncLimits) // Goroutine which builds messages after they have been downloaded async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) { @@ -793,93 +721,6 @@ func wantLabels(apiLabels map[string]proton.Label, labelIDs []string) []string { }) } -type attachmentResult struct { - attachment []byte - err error -} - -type attachmentJob struct { - id string - size int64 - result chan attachmentResult -} - -type attachmentDownloader struct { - workerCh chan attachmentJob - cancel context.CancelFunc -} - -func attachmentWorker(ctx context.Context, client *proton.Client, work <-chan attachmentJob) { - for { - select { - case <-ctx.Done(): - return - case job, ok := <-work: - if !ok { - return - } - var b bytes.Buffer - b.Grow(int(job.size)) - err := client.GetAttachmentInto(ctx, job.id, &b) - select { - case <-ctx.Done(): - close(job.result) - return - case job.result <- attachmentResult{attachment: b.Bytes(), err: err}: - close(job.result) - } - } - } -} - -func (user *User) newAttachmentDownloader(ctx context.Context, client *proton.Client, workerCount int) *attachmentDownloader { - workerCh := make(chan attachmentJob, (workerCount+2)*workerCount) - ctx, cancel := context.WithCancel(ctx) - for i := 0; i < workerCount; i++ { - workerCh = make(chan attachmentJob) - async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) { attachmentWorker(ctx, client, workerCh) }, logging.Labels{ - "sync": fmt.Sprintf("att-downloader %v", i), - }) - } - - return &attachmentDownloader{ - workerCh: workerCh, - cancel: cancel, - } -} - -func (a *attachmentDownloader) getAttachments(ctx context.Context, attachments []proton.Attachment) ([][]byte, error) { - resultChs := make([]chan attachmentResult, len(attachments)) - for i, id := range attachments { - resultChs[i] = make(chan attachmentResult, 1) - select { - case a.workerCh <- attachmentJob{id: id.ID, result: resultChs[i], size: id.Size}: - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - result := make([][]byte, len(attachments)) - var err error - for i := 0; i < len(attachments); i++ { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case r := <-resultChs[i]: - if r.err != nil { - err = fmt.Errorf("failed to get attachment %v: %w", attachments[i], r.err) - } - result[i] = r.attachment - } - } - - return result, err -} - -func (a *attachmentDownloader) close() { - a.cancel() -} - func chunkSyncBuilderBatch(batch []proton.FullMessage, maxMemory uint64) [][]proton.FullMessage { var expectedMemUsage uint64 var chunks [][]proton.FullMessage diff --git a/internal/user/sync_downloader.go b/internal/user/sync_downloader.go new file mode 100644 index 00000000..4d61b2f0 --- /dev/null +++ b/internal/user/sync_downloader.go @@ -0,0 +1,339 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package user + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "time" + + "github.com/ProtonMail/gluon/async" + "github.com/ProtonMail/gluon/logging" + "github.com/ProtonMail/go-proton-api" + "github.com/bradenaw/juniper/parallel" + "github.com/bradenaw/juniper/xslices" + "github.com/sirupsen/logrus" +) + +type downloadRequest struct { + ids []string + expectedSize uint64 + err error +} + +type downloadedMessageBatch struct { + batch []proton.FullMessage +} + +type MessageDownloader interface { + GetAttachmentInto(ctx context.Context, attachmentID string, reader io.ReaderFrom) error + GetMessage(ctx context.Context, messageID string) (proton.Message, error) +} + +type downloadState int + +const ( + downloadStateZero downloadState = iota + downloadStateHasMessage + downloadStateFinished +) + +type downloadResult struct { + ID string + State downloadState + Message proton.FullMessage + err error +} + +func startSyncDownloader(ctx context.Context, panicHandler async.PanicHandler, downloader MessageDownloader, downloadCh <-chan downloadRequest, syncLimits syncLimits) (<-chan downloadedMessageBatch, <-chan error) { + buildCh := make(chan downloadedMessageBatch) + errorCh := make(chan error, syncLimits.MaxParallelDownloads*4) + + // Goroutine in charge of downloading and building messages in maxBatchSize batches. + async.GoAnnotated(ctx, panicHandler, func(ctx context.Context) { + defer close(buildCh) + defer close(errorCh) + defer func() { + logrus.Debugf("sync downloader exit") + }() + + attachmentDownloader := newAttachmentDownloader(ctx, panicHandler, downloader, syncLimits.MaxParallelDownloads) + defer attachmentDownloader.close() + + for request := range downloadCh { + logrus.Debugf("Download request: %v MB:%v", len(request.ids), toMB(request.expectedSize)) + if request.err != nil { + errorCh <- request.err + return + } + + result, err := downloadMessageStage1(ctx, panicHandler, request, downloader, attachmentDownloader, syncLimits.MaxParallelDownloads) + if err != nil { + errorCh <- err + return + } + + if ctx.Err() != nil { + errorCh <- ctx.Err() + return + } + + batch, err := downloadMessagesStage2(ctx, result, downloader, SyncRetryCooldown) + if err != nil { + errorCh <- err + return + } + + select { + case buildCh <- downloadedMessageBatch{ + batch: batch, + }: + + case <-ctx.Done(): + return + } + } + }, logging.Labels{"sync-stage": "download"}) + + return buildCh, errorCh +} + +type attachmentResult struct { + attachment []byte + err error +} + +type attachmentJob struct { + id string + size int64 + result chan attachmentResult +} + +type attachmentDownloader struct { + workerCh chan attachmentJob + cancel context.CancelFunc +} + +func attachmentWorker(ctx context.Context, downloader MessageDownloader, work <-chan attachmentJob) { + for { + select { + case <-ctx.Done(): + return + case job, ok := <-work: + if !ok { + return + } + var b bytes.Buffer + b.Grow(int(job.size)) + err := downloader.GetAttachmentInto(ctx, job.id, &b) + select { + case <-ctx.Done(): + close(job.result) + return + case job.result <- attachmentResult{attachment: b.Bytes(), err: err}: + close(job.result) + } + } + } +} + +func newAttachmentDownloader(ctx context.Context, panicHandler async.PanicHandler, downloader MessageDownloader, workerCount int) *attachmentDownloader { + workerCh := make(chan attachmentJob, (workerCount+2)*workerCount) + ctx, cancel := context.WithCancel(ctx) + for i := 0; i < workerCount; i++ { + workerCh = make(chan attachmentJob) + async.GoAnnotated(ctx, panicHandler, func(ctx context.Context) { attachmentWorker(ctx, downloader, workerCh) }, logging.Labels{ + "sync": fmt.Sprintf("att-downloader %v", i), + }) + } + + return &attachmentDownloader{ + workerCh: workerCh, + cancel: cancel, + } +} + +func (a *attachmentDownloader) getAttachments(ctx context.Context, attachments []proton.Attachment) ([][]byte, error) { + resultChs := make([]chan attachmentResult, len(attachments)) + for i, id := range attachments { + resultChs[i] = make(chan attachmentResult, 1) + select { + case a.workerCh <- attachmentJob{id: id.ID, result: resultChs[i], size: id.Size}: + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + result := make([][]byte, len(attachments)) + var err error + for i := 0; i < len(attachments); i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case r := <-resultChs[i]: + if r.err != nil { + err = fmt.Errorf("failed to get attachment %v: %w", attachments[i], r.err) + } + result[i] = r.attachment + } + } + + return result, err +} + +func (a *attachmentDownloader) close() { + a.cancel() +} + +func downloadMessageStage1( + ctx context.Context, + panicHandler async.PanicHandler, + request downloadRequest, + downloader MessageDownloader, + attachmentDownloader *attachmentDownloader, + parallelDownloads int, +) ([]downloadResult, error) { + // 1st attempt download everything in parallel + return parallel.MapContext(ctx, parallelDownloads, request.ids, func(ctx context.Context, id string) (downloadResult, error) { + defer async.HandlePanic(panicHandler) + + result := downloadResult{ID: id} + + msg, err := downloader.GetMessage(ctx, id) + if err != nil { + logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message") + result.err = err + return result, nil + } + + result.Message.Message = msg + result.State = downloadStateHasMessage + + attachments, err := attachmentDownloader.getAttachments(ctx, msg.Attachments) + result.Message.AttData = attachments + + if err != nil { + logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message attachments") + result.err = err + return result, nil + } + + result.State = downloadStateFinished + + return result, nil + }) +} + +func downloadMessagesStage2(ctx context.Context, state []downloadResult, downloader MessageDownloader, coolDown time.Duration) ([]proton.FullMessage, error) { + logrus.Debug("Entering download stage 2") + var retryList []int + var shouldWaitBeforeRetry bool + + for { + if shouldWaitBeforeRetry { + time.Sleep(coolDown) + } + + retryList = nil + shouldWaitBeforeRetry = false + + for index, s := range state { + if s.State == downloadStateFinished { + continue + } + + if s.err != nil { + if is429Error(s.err) { + logrus.WithField("msg-id", s.ID).Debug("Message download failed due to 429, retrying") + retryList = append(retryList, index) + continue + } + return nil, s.err + } + } + + if len(retryList) == 0 { + break + } + + for _, i := range retryList { + st := &state[i] + if st.State == downloadStateZero { + message, err := downloader.GetMessage(ctx, st.ID) + if err != nil { + logrus.WithField("msg-id", st.ID).WithError(err).Error("failed to download message (429)") + if is429Error(err) { + st.err = err + shouldWaitBeforeRetry = true + continue + } + + return nil, err + } + + st.Message.Message = message + st.State = downloadStateHasMessage + } + + if st.Message.AttData == nil && st.Message.NumAttachments != 0 { + st.Message.AttData = make([][]byte, st.Message.NumAttachments) + } + + hasAllAttachments := true + for i := 0; i < st.Message.NumAttachments; i++ { + if st.Message.AttData[i] == nil { + buffer := bytes.Buffer{} + if err := downloader.GetAttachmentInto(ctx, st.Message.Attachments[i].ID, &buffer); err != nil { + logrus.WithField("msg-id", st.ID).WithError(err).Errorf("failed to download attachment %v/%v (429)", i+1, len(st.Message.Attachments)) + if is429Error(err) { + st.err = err + shouldWaitBeforeRetry = true + hasAllAttachments = false + continue + } + + return nil, err + } + + st.Message.AttData[i] = buffer.Bytes() + } + } + + if hasAllAttachments { + st.State = downloadStateFinished + } + } + } + + logrus.Debug("All message downloaded successfully") + return xslices.Map(state, func(s downloadResult) proton.FullMessage { + return s.Message + }), nil +} + +func is429Error(err error) bool { + var apiErr *proton.APIError + if errors.As(err, &apiErr) { + return apiErr.Status == 429 + } + + return false +} diff --git a/internal/user/sync_downloader_test.go b/internal/user/sync_downloader_test.go new file mode 100644 index 00000000..b8edfdd9 --- /dev/null +++ b/internal/user/sync_downloader_test.go @@ -0,0 +1,400 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package user + +import ( + "context" + "fmt" + "io" + "strings" + "testing" + "time" + + "github.com/ProtonMail/gluon/async" + "github.com/ProtonMail/go-proton-api" + "github.com/ProtonMail/proton-bridge/v3/internal/user/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +func TestSyncDownloader_Stage1_429(t *testing.T) { + // Check 429 is correctly caught and download state recorded correctly + // Message 1: All ok + // Message 2: Message failed + // Message 3: One attachment failed. + mockCtrl := gomock.NewController(t) + messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) + panicHandler := &async.NoopPanicHandler{} + ctx := context.Background() + + requests := downloadRequest{ + ids: []string{"Msg1", "Msg2", "Msg3"}, + expectedSize: 0, + err: nil, + } + + messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg1")).Times(1).Return(proton.Message{ + MessageMetadata: proton.MessageMetadata{ + ID: "MsgID1", + NumAttachments: 1, + }, + Attachments: []proton.Attachment{ + { + ID: "Attachment1_1", + }, + }, + }, nil) + + messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg2")).Times(1).Return(proton.Message{}, &proton.APIError{Status: 429}) + messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).Times(1).Return(proton.Message{ + MessageMetadata: proton.MessageMetadata{ + ID: "MsgID3", + NumAttachments: 2, + }, + Attachments: []proton.Attachment{ + { + ID: "Attachment3_1", + }, + { + ID: "Attachment3_2", + }, + }, + }, nil) + + const attachmentData = "attachment data" + + messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("Attachment1_1"), gomock.Any()).Times(1).DoAndReturn(func(_ context.Context, _ string, r io.ReaderFrom) error { + _, err := r.ReadFrom(strings.NewReader(attachmentData)) + return err + }) + + messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("Attachment3_1"), gomock.Any()).Times(1).Return(&proton.APIError{Status: 429}) + messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("Attachment3_2"), gomock.Any()).Times(1).DoAndReturn(func(_ context.Context, _ string, r io.ReaderFrom) error { + _, err := r.ReadFrom(strings.NewReader(attachmentData)) + return err + }) + + attachmentDownloader := newAttachmentDownloader(ctx, panicHandler, messageDownloader, 1) + defer attachmentDownloader.close() + + result, err := downloadMessageStage1(ctx, panicHandler, requests, messageDownloader, attachmentDownloader, 1) + require.NoError(t, err) + require.Equal(t, 3, len(result)) + // Check message 1 + require.Equal(t, result[0].State, downloadStateFinished) + require.Equal(t, result[0].Message.ID, "MsgID1") + require.NotEmpty(t, result[0].Message.AttData) + require.NotEqual(t, attachmentData, result[0].Message.AttData[0]) + require.NotNil(t, result[0].Message.AttData[0]) + require.Nil(t, result[0].err) + + // Check message 2 + require.Equal(t, result[1].State, downloadStateZero) + require.Empty(t, result[1].Message.ID) + require.NotNil(t, result[1].err) + + require.Equal(t, result[2].State, downloadStateHasMessage) + require.Equal(t, result[2].Message.ID, "MsgID3") + require.Equal(t, 2, len(result[2].Message.AttData)) + require.NotNil(t, result[2].err) + require.Nil(t, result[2].Message.AttData[0]) + require.NotEqual(t, attachmentData, result[2].Message.AttData[1]) + require.NotNil(t, result[2].err) +} + +func TestSyncDownloader_Stage2_Everything200(t *testing.T) { + mockCtrl := gomock.NewController(t) + messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) + ctx := context.Background() + + downloadResult := []downloadResult{ + { + ID: "Msg1", + State: downloadStateFinished, + }, + { + ID: "Msg2", + State: downloadStateFinished, + }, + } + + result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + require.NoError(t, err) + require.Equal(t, 2, len(result)) +} + +func TestSyncDownloader_Stage2_Not429(t *testing.T) { + mockCtrl := gomock.NewController(t) + messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) + ctx := context.Background() + + msgErr := fmt.Errorf("something not 429") + downloadResult := []downloadResult{ + { + ID: "Msg1", + State: downloadStateFinished, + }, + { + ID: "Msg2", + State: downloadStateHasMessage, + err: msgErr, + }, + { + ID: "Msg3", + State: downloadStateFinished, + }, + } + + _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + require.Error(t, err) + require.Equal(t, msgErr, err) +} + +func TestSyncDownloader_Stage2_API500(t *testing.T) { + mockCtrl := gomock.NewController(t) + messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) + ctx := context.Background() + + msgErr := &proton.APIError{Status: 500} + downloadResult := []downloadResult{ + { + ID: "Msg2", + State: downloadStateHasMessage, + err: msgErr, + }, + { + ID: "Msg3", + State: downloadStateFinished, + }, + } + + _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + require.Error(t, err) + require.Equal(t, msgErr, err) +} + +func TestSyncDownloader_Stage2_Some429(t *testing.T) { + mockCtrl := gomock.NewController(t) + messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) + ctx := context.Background() + + const attachmentData1 = "attachment data 1" + const attachmentData2 = "attachment data 2" + const attachmentData3 = "attachment data 3" + const attachmentData4 = "attachment data 4" + + err429 := &proton.APIError{Status: 429} + downloadResult := []downloadResult{ + { + // Full message , but missing 1 of 2 attachments + ID: "Msg1", + Message: proton.FullMessage{ + Message: proton.Message{ + MessageMetadata: proton.MessageMetadata{ + ID: "Msg1", + NumAttachments: 2, + }, + Attachments: []proton.Attachment{ + { + ID: "A3", + }, + { + ID: "A4", + }, + }, + }, + AttData: [][]byte{ + nil, + []byte(attachmentData4), + }, + }, + State: downloadStateHasMessage, + err: err429, + }, + { + // Full message, but missing all attachments + ID: "Msg2", + Message: proton.FullMessage{ + Message: proton.Message{ + MessageMetadata: proton.MessageMetadata{ + ID: "Msg2", + NumAttachments: 2, + }, + Attachments: []proton.Attachment{ + { + ID: "A1", + }, + { + ID: "A2", + }, + }, + }, + AttData: nil, + }, + State: downloadStateHasMessage, + err: err429, + }, + { + // Missing everything + ID: "Msg3", + State: downloadStateZero, + Message: proton.FullMessage{ + Message: proton.Message{MessageMetadata: proton.MessageMetadata{ID: "Msg3"}}, + }, + err: err429, + }, + } + + { + // Simulate 2 failures for message 3 body. + firstCall := messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).Times(2).Return(proton.Message{}, err429) + messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).After(firstCall).Times(1).Return(proton.Message{ + MessageMetadata: proton.MessageMetadata{ + ID: "Msg3", + }, + }, nil) + } + + { + // Simulate failures for message 2 attachments. + firstCall := messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A1"), gomock.Any()).Times(2).Return(err429) + messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A1"), gomock.Any()).After(firstCall).Times(1).DoAndReturn(func(_ context.Context, _ string, r io.ReaderFrom) error { + _, err := r.ReadFrom(strings.NewReader(attachmentData1)) + return err + }) + messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A2"), gomock.Any()).Times(1).DoAndReturn(func(_ context.Context, _ string, r io.ReaderFrom) error { + _, err := r.ReadFrom(strings.NewReader(attachmentData2)) + return err + }) + } + + { + messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A3"), gomock.Any()).Times(1).DoAndReturn(func(_ context.Context, _ string, r io.ReaderFrom) error { + _, err := r.ReadFrom(strings.NewReader(attachmentData3)) + return err + }) + } + + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + require.NoError(t, err) + require.Equal(t, 3, len(messages)) + + require.Equal(t, messages[0].Message.ID, "Msg1") + require.Equal(t, messages[1].Message.ID, "Msg2") + require.Equal(t, messages[2].Message.ID, "Msg3") + + // check attachments + require.Equal(t, attachmentData3, string(messages[0].AttData[0])) + require.Equal(t, attachmentData4, string(messages[0].AttData[1])) + require.Equal(t, attachmentData1, string(messages[1].AttData[0])) + require.Equal(t, attachmentData2, string(messages[1].AttData[1])) + require.Empty(t, messages[2].AttData) +} + +func TestSyncDownloader_Stage2_ErrorOnNon429MessageDownload(t *testing.T) { + mockCtrl := gomock.NewController(t) + messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) + ctx := context.Background() + + err429 := &proton.APIError{Status: 429} + err500 := &proton.APIError{Status: 500} + downloadResult := []downloadResult{ + { + // Missing everything + ID: "Msg3", + State: downloadStateZero, + Message: proton.FullMessage{ + Message: proton.Message{MessageMetadata: proton.MessageMetadata{ID: "Msg3"}}, + }, + err: err429, + }, + { + // Full message , but missing 1 of 2 attachments + ID: "Msg1", + Message: proton.FullMessage{ + Message: proton.Message{ + MessageMetadata: proton.MessageMetadata{ + ID: "Msg1", + NumAttachments: 2, + }, + Attachments: []proton.Attachment{ + { + ID: "A3", + }, + { + ID: "A4", + }, + }, + }, + }, + State: downloadStateHasMessage, + err: err429, + }, + } + + { + // Simulate 2 failures for message 3 body, + messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).Times(1).Return(proton.Message{}, err500) + } + + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + require.Error(t, err) + require.Empty(t, 0, messages) +} + +func TestSyncDownloader_Stage2_ErrorOnNon429AttachmentDownload(t *testing.T) { + mockCtrl := gomock.NewController(t) + messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) + ctx := context.Background() + + err429 := &proton.APIError{Status: 429} + err500 := &proton.APIError{Status: 500} + downloadResult := []downloadResult{ + { + // Full message , but missing 1 of 2 attachments + ID: "Msg1", + Message: proton.FullMessage{ + Message: proton.Message{ + MessageMetadata: proton.MessageMetadata{ + ID: "Msg1", + NumAttachments: 2, + }, + Attachments: []proton.Attachment{ + { + ID: "A3", + }, + { + ID: "A4", + }, + }, + }, + }, + State: downloadStateHasMessage, + err: err429, + }, + } + + // 429 for first attachment + messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A3"), gomock.Any()).Times(1).Return(err429) + // 500 for second attachment + messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A4"), gomock.Any()).Times(1).Return(err500) + + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + require.Error(t, err) + require.Empty(t, 0, messages) +} From f1cf4ee19434af54d5d20460ee108d482545fec0 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Wed, 26 Jul 2023 17:56:57 +0200 Subject: [PATCH 02/15] fix(GODT-2822): Sync Cache When the sync fail, store the previously downloaded data in memory so that on next retry we don't have to re-download everything. --- internal/user/events.go | 2 + internal/user/sync.go | 41 ++++++++--- internal/user/sync_downloader.go | 82 ++++++++++++++++------ internal/user/sync_downloader_test.go | 89 +++++++++++++++++++++--- internal/user/sync_message_cache.go | 98 +++++++++++++++++++++++++++ internal/user/user.go | 2 + 6 files changed, 275 insertions(+), 39 deletions(-) create mode 100644 internal/user/sync_message_cache.go diff --git a/internal/user/events.go b/internal/user/events.go index 6876aef6..ce79758f 100644 --- a/internal/user/events.go +++ b/internal/user/events.go @@ -89,6 +89,8 @@ func (user *User) handleRefreshEvent(ctx context.Context, refresh proton.Refresh // Re-sync messages after the user, address and label refresh. defer user.goSync() + user.syncCache.Clear() + return user.syncUserAddressesLabelsAndClearSync(ctx, false) } diff --git a/internal/user/sync.go b/internal/user/sync.go index 84cce7c6..25cded53 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -373,13 +373,15 @@ func (user *User) syncMessages( ) type flushUpdate struct { - messageID string - err error - batchLen int + batchMessageID string + messages []proton.FullMessage + err error + batchLen int } type builtMessageBatch struct { - batch []*buildRes + batch []*buildRes + messages []proton.FullMessage } downloadCh := make(chan downloadRequest) @@ -455,7 +457,7 @@ func (user *User) syncMessages( }, logging.Labels{"sync-stage": "meta-data"}) // Goroutine in charge of downloading and building messages in maxBatchSize batches. - buildCh, errorCh := startSyncDownloader(ctx, user.panicHandler, user.client, downloadCh, syncLimits) + buildCh, errorCh := startSyncDownloader(ctx, user.panicHandler, user.client, user.syncCache, downloadCh, syncLimits) // Goroutine which builds messages after they have been downloaded async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) { @@ -501,7 +503,7 @@ func (user *User) syncMessages( } select { - case flushCh <- builtMessageBatch{result}: + case flushCh <- builtMessageBatch{batch: result, messages: buildBatch.batch}: case <-ctx.Done(): return @@ -580,9 +582,11 @@ func (user *User) syncMessages( select { case flushUpdateCh <- flushUpdate{ - messageID: downloadBatch.batch[0].messageID, - err: nil, - batchLen: len(downloadBatch.batch), + batchMessageID: downloadBatch.batch[0].messageID, + messages: downloadBatch.messages, + + err: nil, + batchLen: len(downloadBatch.batch), }: case <-ctx.Done(): return @@ -595,14 +599,29 @@ func (user *User) syncMessages( return flushUpdate.err } - if err := vault.SetLastMessageID(flushUpdate.messageID); err != nil { + if err := vault.SetLastMessageID(flushUpdate.batchMessageID); err != nil { return fmt.Errorf("failed to set last synced message ID: %w", err) } + for _, m := range flushUpdate.messages { + user.syncCache.DeleteMessages(m.ID) + if m.NumAttachments != 0 { + user.syncCache.DeleteAttachments(xslices.Map(m.Attachments, func(a proton.Attachment) string { + return a.ID + })...) + } + } + syncReporter.add(flushUpdate.batchLen) } - return <-errorCh + err := <-errorCh + + if err != nil { + user.syncCache.Clear() + } + + return err } func newSystemMailboxCreatedUpdate(labelID imap.MailboxID, labelName string) *imap.MailboxCreated { diff --git a/internal/user/sync_downloader.go b/internal/user/sync_downloader.go index 4d61b2f0..e84450bb 100644 --- a/internal/user/sync_downloader.go +++ b/internal/user/sync_downloader.go @@ -63,7 +63,14 @@ type downloadResult struct { err error } -func startSyncDownloader(ctx context.Context, panicHandler async.PanicHandler, downloader MessageDownloader, downloadCh <-chan downloadRequest, syncLimits syncLimits) (<-chan downloadedMessageBatch, <-chan error) { +func startSyncDownloader( + ctx context.Context, + panicHandler async.PanicHandler, + downloader MessageDownloader, + cache *SyncDownloadCache, + downloadCh <-chan downloadRequest, + syncLimits syncLimits, +) (<-chan downloadedMessageBatch, <-chan error) { buildCh := make(chan downloadedMessageBatch) errorCh := make(chan error, syncLimits.MaxParallelDownloads*4) @@ -75,7 +82,7 @@ func startSyncDownloader(ctx context.Context, panicHandler async.PanicHandler, d logrus.Debugf("sync downloader exit") }() - attachmentDownloader := newAttachmentDownloader(ctx, panicHandler, downloader, syncLimits.MaxParallelDownloads) + attachmentDownloader := newAttachmentDownloader(ctx, panicHandler, downloader, cache, syncLimits.MaxParallelDownloads) defer attachmentDownloader.close() for request := range downloadCh { @@ -85,7 +92,7 @@ func startSyncDownloader(ctx context.Context, panicHandler async.PanicHandler, d return } - result, err := downloadMessageStage1(ctx, panicHandler, request, downloader, attachmentDownloader, syncLimits.MaxParallelDownloads) + result, err := downloadMessageStage1(ctx, panicHandler, request, downloader, attachmentDownloader, cache, syncLimits.MaxParallelDownloads) if err != nil { errorCh <- err return @@ -96,7 +103,7 @@ func startSyncDownloader(ctx context.Context, panicHandler async.PanicHandler, d return } - batch, err := downloadMessagesStage2(ctx, result, downloader, SyncRetryCooldown) + batch, err := downloadMessagesStage2(ctx, result, downloader, cache, SyncRetryCooldown) if err != nil { errorCh <- err return @@ -132,7 +139,7 @@ type attachmentDownloader struct { cancel context.CancelFunc } -func attachmentWorker(ctx context.Context, downloader MessageDownloader, work <-chan attachmentJob) { +func attachmentWorker(ctx context.Context, downloader MessageDownloader, cache *SyncDownloadCache, work <-chan attachmentJob) { for { select { case <-ctx.Done(): @@ -141,26 +148,45 @@ func attachmentWorker(ctx context.Context, downloader MessageDownloader, work <- if !ok { return } - var b bytes.Buffer - b.Grow(int(job.size)) - err := downloader.GetAttachmentInto(ctx, job.id, &b) + + var result attachmentResult + if data, ok := cache.GetAttachment(job.id); ok { + result.attachment = data + result.err = nil + } else { + var b bytes.Buffer + b.Grow(int(job.size)) + err := downloader.GetAttachmentInto(ctx, job.id, &b) + result.attachment = b.Bytes() + result.err = err + if err == nil { + cache.StoreAttachment(job.id, result.attachment) + } + } + select { case <-ctx.Done(): close(job.result) return - case job.result <- attachmentResult{attachment: b.Bytes(), err: err}: + case job.result <- result: close(job.result) } } } } -func newAttachmentDownloader(ctx context.Context, panicHandler async.PanicHandler, downloader MessageDownloader, workerCount int) *attachmentDownloader { +func newAttachmentDownloader( + ctx context.Context, + panicHandler async.PanicHandler, + downloader MessageDownloader, + cache *SyncDownloadCache, + workerCount int, +) *attachmentDownloader { workerCh := make(chan attachmentJob, (workerCount+2)*workerCount) ctx, cancel := context.WithCancel(ctx) for i := 0; i < workerCount; i++ { workerCh = make(chan attachmentJob) - async.GoAnnotated(ctx, panicHandler, func(ctx context.Context) { attachmentWorker(ctx, downloader, workerCh) }, logging.Labels{ + async.GoAnnotated(ctx, panicHandler, func(ctx context.Context) { attachmentWorker(ctx, downloader, cache, workerCh) }, logging.Labels{ "sync": fmt.Sprintf("att-downloader %v", i), }) } @@ -209,6 +235,7 @@ func downloadMessageStage1( request downloadRequest, downloader MessageDownloader, attachmentDownloader *attachmentDownloader, + cache *SyncDownloadCache, parallelDownloads int, ) ([]downloadResult, error) { // 1st attempt download everything in parallel @@ -217,21 +244,28 @@ func downloadMessageStage1( result := downloadResult{ID: id} - msg, err := downloader.GetMessage(ctx, id) - if err != nil { - logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message") - result.err = err - return result, nil + v, ok := cache.GetMessage(id) + if !ok { + msg, err := downloader.GetMessage(ctx, id) + if err != nil { + logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message") + result.err = err + return result, nil + } + + cache.StoreMessage(msg) + result.Message.Message = msg + } else { + result.Message.Message = v } - result.Message.Message = msg result.State = downloadStateHasMessage - attachments, err := attachmentDownloader.getAttachments(ctx, msg.Attachments) + attachments, err := attachmentDownloader.getAttachments(ctx, result.Message.Attachments) result.Message.AttData = attachments if err != nil { - logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message attachments") + logrus.WithError(err).WithField("msgID", id).Error("Failed to download message attachments") result.err = err return result, nil } @@ -242,7 +276,13 @@ func downloadMessageStage1( }) } -func downloadMessagesStage2(ctx context.Context, state []downloadResult, downloader MessageDownloader, coolDown time.Duration) ([]proton.FullMessage, error) { +func downloadMessagesStage2( + ctx context.Context, + state []downloadResult, + downloader MessageDownloader, + cache *SyncDownloadCache, + coolDown time.Duration, +) ([]proton.FullMessage, error) { logrus.Debug("Entering download stage 2") var retryList []int var shouldWaitBeforeRetry bool @@ -289,6 +329,7 @@ func downloadMessagesStage2(ctx context.Context, state []downloadResult, downloa return nil, err } + cache.StoreMessage(message) st.Message.Message = message st.State = downloadStateHasMessage } @@ -314,6 +355,7 @@ func downloadMessagesStage2(ctx context.Context, state []downloadResult, downloa } st.Message.AttData[i] = buffer.Bytes() + cache.StoreAttachment(st.Message.Attachments[i].ID, st.Message.AttData[i]) } } diff --git a/internal/user/sync_downloader_test.go b/internal/user/sync_downloader_test.go index b8edfdd9..c03d6061 100644 --- a/internal/user/sync_downloader_test.go +++ b/internal/user/sync_downloader_test.go @@ -89,10 +89,11 @@ func TestSyncDownloader_Stage1_429(t *testing.T) { return err }) - attachmentDownloader := newAttachmentDownloader(ctx, panicHandler, messageDownloader, 1) + cache := newSyncDownloadCache() + attachmentDownloader := newAttachmentDownloader(ctx, panicHandler, messageDownloader, cache, 1) defer attachmentDownloader.close() - result, err := downloadMessageStage1(ctx, panicHandler, requests, messageDownloader, attachmentDownloader, 1) + result, err := downloadMessageStage1(ctx, panicHandler, requests, messageDownloader, attachmentDownloader, cache, 1) require.NoError(t, err) require.Equal(t, 3, len(result)) // Check message 1 @@ -115,12 +116,21 @@ func TestSyncDownloader_Stage1_429(t *testing.T) { require.Nil(t, result[2].Message.AttData[0]) require.NotEqual(t, attachmentData, result[2].Message.AttData[1]) require.NotNil(t, result[2].err) + + _, ok := cache.GetMessage("MsgID1") + require.True(t, ok) + _, ok = cache.GetMessage("MsgID3") + require.True(t, ok) + att, ok := cache.GetAttachment("Attachment1_1") + require.True(t, ok) + require.Equal(t, attachmentData, string(att)) } func TestSyncDownloader_Stage2_Everything200(t *testing.T) { mockCtrl := gomock.NewController(t) messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) ctx := context.Background() + cache := newSyncDownloadCache() downloadResult := []downloadResult{ { @@ -133,7 +143,7 @@ func TestSyncDownloader_Stage2_Everything200(t *testing.T) { }, } - result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) require.NoError(t, err) require.Equal(t, 2, len(result)) } @@ -142,6 +152,7 @@ func TestSyncDownloader_Stage2_Not429(t *testing.T) { mockCtrl := gomock.NewController(t) messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) ctx := context.Background() + cache := newSyncDownloadCache() msgErr := fmt.Errorf("something not 429") downloadResult := []downloadResult{ @@ -160,7 +171,7 @@ func TestSyncDownloader_Stage2_Not429(t *testing.T) { }, } - _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) require.Error(t, err) require.Equal(t, msgErr, err) } @@ -169,6 +180,7 @@ func TestSyncDownloader_Stage2_API500(t *testing.T) { mockCtrl := gomock.NewController(t) messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) ctx := context.Background() + cache := newSyncDownloadCache() msgErr := &proton.APIError{Status: 500} downloadResult := []downloadResult{ @@ -183,7 +195,7 @@ func TestSyncDownloader_Stage2_API500(t *testing.T) { }, } - _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) require.Error(t, err) require.Equal(t, msgErr, err) } @@ -192,6 +204,7 @@ func TestSyncDownloader_Stage2_Some429(t *testing.T) { mockCtrl := gomock.NewController(t) messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) ctx := context.Background() + cache := newSyncDownloadCache() const attachmentData1 = "attachment data 1" const attachmentData2 = "attachment data 2" @@ -290,7 +303,7 @@ func TestSyncDownloader_Stage2_Some429(t *testing.T) { }) } - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) require.NoError(t, err) require.Equal(t, 3, len(messages)) @@ -304,12 +317,28 @@ func TestSyncDownloader_Stage2_Some429(t *testing.T) { require.Equal(t, attachmentData1, string(messages[1].AttData[0])) require.Equal(t, attachmentData2, string(messages[1].AttData[1])) require.Empty(t, messages[2].AttData) + + _, ok := cache.GetMessage("Msg3") + require.True(t, ok) + + att3, ok := cache.GetAttachment("A3") + require.True(t, ok) + require.Equal(t, attachmentData3, string(att3)) + + att1, ok := cache.GetAttachment("A1") + require.True(t, ok) + require.Equal(t, attachmentData1, string(att1)) + + att2, ok := cache.GetAttachment("A2") + require.True(t, ok) + require.Equal(t, attachmentData2, string(att2)) } func TestSyncDownloader_Stage2_ErrorOnNon429MessageDownload(t *testing.T) { mockCtrl := gomock.NewController(t) messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) ctx := context.Background() + cache := newSyncDownloadCache() err429 := &proton.APIError{Status: 429} err500 := &proton.APIError{Status: 500} @@ -352,7 +381,7 @@ func TestSyncDownloader_Stage2_ErrorOnNon429MessageDownload(t *testing.T) { messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).Times(1).Return(proton.Message{}, err500) } - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) require.Error(t, err) require.Empty(t, 0, messages) } @@ -361,6 +390,7 @@ func TestSyncDownloader_Stage2_ErrorOnNon429AttachmentDownload(t *testing.T) { mockCtrl := gomock.NewController(t) messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) ctx := context.Background() + cache := newSyncDownloadCache() err429 := &proton.APIError{Status: 429} err500 := &proton.APIError{Status: 500} @@ -394,7 +424,50 @@ func TestSyncDownloader_Stage2_ErrorOnNon429AttachmentDownload(t *testing.T) { // 500 for second attachment messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A4"), gomock.Any()).Times(1).Return(err500) - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, time.Millisecond) + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) require.Error(t, err) require.Empty(t, 0, messages) } + +func TestSyncDownloader_Stage1_DoNotDownloadIfAlreadyInCache(t *testing.T) { + mockCtrl := gomock.NewController(t) + messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) + panicHandler := &async.NoopPanicHandler{} + ctx := context.Background() + + requests := downloadRequest{ + ids: []string{"Msg1", "Msg3"}, + expectedSize: 0, + err: nil, + } + + cache := newSyncDownloadCache() + attachmentDownloader := newAttachmentDownloader(ctx, panicHandler, messageDownloader, cache, 1) + defer attachmentDownloader.close() + + const attachmentData = "attachment data" + + cache.StoreMessage(proton.Message{MessageMetadata: proton.MessageMetadata{ID: "Msg1", NumAttachments: 1}, Attachments: []proton.Attachment{{ID: "A1"}}}) + cache.StoreMessage(proton.Message{MessageMetadata: proton.MessageMetadata{ID: "Msg3", NumAttachments: 2}, Attachments: []proton.Attachment{{ID: "A2"}}}) + + cache.StoreAttachment("A1", []byte(attachmentData)) + cache.StoreAttachment("A2", []byte(attachmentData)) + + result, err := downloadMessageStage1(ctx, panicHandler, requests, messageDownloader, attachmentDownloader, cache, 1) + require.NoError(t, err) + require.Equal(t, 2, len(result)) + + require.Equal(t, result[0].State, downloadStateFinished) + require.Equal(t, result[0].Message.ID, "Msg1") + require.NotEmpty(t, result[0].Message.AttData) + require.NotEqual(t, attachmentData, result[0].Message.AttData[0]) + require.NotNil(t, result[0].Message.AttData[0]) + require.Nil(t, result[0].err) + + require.Equal(t, result[1].State, downloadStateFinished) + require.Equal(t, result[1].Message.ID, "Msg3") + require.NotEmpty(t, result[1].Message.AttData) + require.NotEqual(t, attachmentData, result[1].Message.AttData[0]) + require.NotNil(t, result[1].Message.AttData[0]) + require.Nil(t, result[1].err) +} diff --git a/internal/user/sync_message_cache.go b/internal/user/sync_message_cache.go new file mode 100644 index 00000000..6cec6e10 --- /dev/null +++ b/internal/user/sync_message_cache.go @@ -0,0 +1,98 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package user + +import ( + "sync" + + "github.com/ProtonMail/go-proton-api" +) + +type SyncDownloadCache struct { + messageLock sync.RWMutex + messages map[string]proton.Message + attachmentLock sync.RWMutex + attachments map[string][]byte +} + +func newSyncDownloadCache() *SyncDownloadCache { + return &SyncDownloadCache{ + messages: make(map[string]proton.Message, 64), + attachments: make(map[string][]byte, 64), + } +} + +func (s *SyncDownloadCache) StoreMessage(message proton.Message) { + s.messageLock.Lock() + defer s.messageLock.Unlock() + + s.messages[message.ID] = message +} + +func (s *SyncDownloadCache) StoreAttachment(id string, data []byte) { + s.attachmentLock.Lock() + defer s.attachmentLock.Unlock() + + s.attachments[id] = data +} + +func (s *SyncDownloadCache) DeleteMessages(id ...string) { + s.messageLock.Lock() + defer s.messageLock.Unlock() + + for _, id := range id { + delete(s.messages, id) + } +} + +func (s *SyncDownloadCache) DeleteAttachments(id ...string) { + s.attachmentLock.Lock() + defer s.attachmentLock.Unlock() + + for _, id := range id { + delete(s.attachments, id) + } +} + +func (s *SyncDownloadCache) GetMessage(id string) (proton.Message, bool) { + s.messageLock.RLock() + defer s.messageLock.RUnlock() + + v, ok := s.messages[id] + + return v, ok +} + +func (s *SyncDownloadCache) GetAttachment(id string) ([]byte, bool) { + s.attachmentLock.RLock() + defer s.attachmentLock.RUnlock() + + v, ok := s.attachments[id] + + return v, ok +} + +func (s *SyncDownloadCache) Clear() { + s.messageLock.Lock() + s.messages = make(map[string]proton.Message, 64) + s.messageLock.Unlock() + + s.attachmentLock.Lock() + s.attachments = make(map[string][]byte, 64) + s.attachmentLock.Unlock() +} diff --git a/internal/user/user.go b/internal/user/user.go index 49d29ab7..0399fba1 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -93,6 +93,7 @@ type User struct { showAllMail uint32 maxSyncMemory uint64 + syncCache *SyncDownloadCache panicHandler async.PanicHandler @@ -171,6 +172,7 @@ func New( showAllMail: b32(showAllMail), maxSyncMemory: maxSyncMemory, + syncCache: newSyncDownloadCache(), panicHandler: crashHandler, From f82965b8258f6c8d4656b7476f469588384871b3 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Wed, 26 Jul 2023 18:12:06 +0200 Subject: [PATCH 03/15] fix(GODT-2812): Fix rare sync deadlock Copy data rather than hold onto the locks while sync is ongoing. The data in question does not change while the sync is ongoing and holding on to the locks during a very long sync can create a deadlock with due to some IMAP operation that needs to acquire one of those locks with write access. --- internal/user/sync.go | 140 +++++++++++++++++++++++------------------- 1 file changed, 76 insertions(+), 64 deletions(-) diff --git a/internal/user/sync.go b/internal/user/sync.go index 25cded53..b0183392 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -113,77 +113,89 @@ func (user *User) doSync(ctx context.Context) error { } func (user *User) sync(ctx context.Context) error { - return safe.RLockRet(func() error { - return withAddrKRs(user.apiUser, user.apiAddrs, user.vault.KeyPass(), func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error { - if !user.vault.SyncStatus().HasLabels { - user.log.Info("Syncing labels") + var apiUser proton.User + var apiAddrs map[string]proton.Address + var apiLabels map[string]proton.Label - if err := syncLabels(ctx, user.apiLabels, xslices.Unique(maps.Values(user.updateCh))...); err != nil { - return fmt.Errorf("failed to sync labels: %w", err) - } + safe.RLock(func() { + // Make a copy of the user, labels and addresses. They don't change during sync and we don't need to + // keep holding on to these locks which are required for imap commands to succeed while this is going on. + apiUser = user.apiUser + apiAddrs = maps.Clone(user.apiAddrs) + apiLabels = maps.Clone(user.apiLabels) + }, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock) - if err := user.vault.SetHasLabels(true); err != nil { - return fmt.Errorf("failed to set has labels: %w", err) - } + // We can keep holding on to the updateCh as this is not used during regular imap processing, only with events. + user.updateChLock.RLock() + defer user.updateChLock.RUnlock() - user.log.Info("Synced labels") - } else { - user.log.Info("Labels are already synced, skipping") + return withAddrKRs(apiUser, apiAddrs, user.vault.KeyPass(), func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error { + if !user.vault.SyncStatus().HasLabels { + user.log.Info("Syncing labels") + + if err := syncLabels(ctx, apiLabels, xslices.Unique(maps.Values(user.updateCh))...); err != nil { + return fmt.Errorf("failed to sync labels: %w", err) } - if !user.vault.SyncStatus().HasMessages { - user.log.Info("Syncing messages") - - // Determine which messages to sync. - messageIDs, err := user.client.GetMessageIDs(ctx, "") - if err != nil { - return fmt.Errorf("failed to get message IDs to sync: %w", err) - } - - logrus.Debugf("User has the following failed synced message ids: %v", user.vault.SyncStatus().FailedMessageIDs) - - // Remove any messages that have already failed to sync. - messageIDs = xslices.Filter(messageIDs, func(messageID string) bool { - return !slices.Contains(user.vault.SyncStatus().FailedMessageIDs, messageID) - }) - - // Reverse the order of the message IDs so that the newest messages are synced first. - xslices.Reverse(messageIDs) - - // If we have a message ID that we've already synced, then we can skip all messages before it. - if idx := xslices.Index(messageIDs, user.vault.SyncStatus().LastMessageID); idx >= 0 { - messageIDs = messageIDs[idx+1:] - } - - // Sync the messages. - if err := user.syncMessages( - ctx, - user.ID(), - messageIDs, - user.client, - user.reporter, - user.vault, - user.apiLabels, - addrKRs, - user.updateCh, - user.eventCh, - user.maxSyncMemory, - ); err != nil { - return fmt.Errorf("failed to sync messages: %w", err) - } - - if err := user.vault.SetHasMessages(true); err != nil { - return fmt.Errorf("failed to set has messages: %w", err) - } - - user.log.Info("Synced messages") - } else { - user.log.Info("Messages are already synced, skipping") + if err := user.vault.SetHasLabels(true); err != nil { + return fmt.Errorf("failed to set has labels: %w", err) } - return nil - }) - }, user.apiUserLock, user.apiAddrsLock, user.apiLabelsLock, user.updateChLock) + user.log.Info("Synced labels") + } else { + user.log.Info("Labels are already synced, skipping") + } + + if !user.vault.SyncStatus().HasMessages { + user.log.Info("Syncing messages") + + // Determine which messages to sync. + messageIDs, err := user.client.GetMessageIDs(ctx, "") + if err != nil { + return fmt.Errorf("failed to get message IDs to sync: %w", err) + } + + // Remove any messages that have already failed to sync. + messageIDs = xslices.Filter(messageIDs, func(messageID string) bool { + return !slices.Contains(user.vault.SyncStatus().FailedMessageIDs, messageID) + }) + + // Reverse the order of the message IDs so that the newest messages are synced first. + xslices.Reverse(messageIDs) + + // If we have a message ID that we've already synced, then we can skip all messages before it. + if idx := xslices.Index(messageIDs, user.vault.SyncStatus().LastMessageID); idx >= 0 { + messageIDs = messageIDs[idx+1:] + } + + // Sync the messages. + if err := user.syncMessages( + ctx, + user.ID(), + messageIDs, + user.client, + user.reporter, + user.vault, + apiLabels, + addrKRs, + user.updateCh, + user.eventCh, + user.maxSyncMemory, + ); err != nil { + return fmt.Errorf("failed to sync messages: %w", err) + } + + if err := user.vault.SetHasMessages(true); err != nil { + return fmt.Errorf("failed to set has messages: %w", err) + } + + user.log.Info("Synced messages") + } else { + user.log.Info("Messages are already synced, skipping") + } + + return nil + }) } // nolint:exhaustive From 25a787529b8cec2b3b9c0acdde07fcd6e1d51a0b Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Thu, 27 Jul 2023 10:00:15 +0200 Subject: [PATCH 04/15] fix(GODT-2822): Retry on 429 during Message ID fetch --- go.mod | 2 +- go.sum | 4 ++-- internal/user/debug.go | 2 +- internal/user/sync.go | 26 +++++++++++++++++++++++++- 4 files changed, 29 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 171266e5..75e25390 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/Masterminds/semver/v3 v3.2.0 github.com/ProtonMail/gluon v0.16.1-0.20230706110757-a9327fb18611 github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a - github.com/ProtonMail/go-proton-api v0.4.1-0.20230704060229-a77a437ec052 + github.com/ProtonMail/go-proton-api v0.4.1-0.20230727082922-9115b4750ec7 github.com/ProtonMail/gopenpgp/v2 v2.7.1-proton github.com/PuerkitoBio/goquery v1.8.1 github.com/abiosoft/ishell v2.0.0+incompatible diff --git a/go.sum b/go.sum index c3e5d887..2b015f2b 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/ProtonMail/go-message v0.13.1-0.20230526094639-b62c999c85b7 h1:+j+Kd/ github.com/ProtonMail/go-message v0.13.1-0.20230526094639-b62c999c85b7/go.mod h1:NBAn21zgCJ/52WLDyed18YvYFm5tEoeDauubFqLokM4= github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f h1:tCbYj7/299ekTTXpdwKYF8eBlsYsDVoggDAuAjoK66k= github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f/go.mod h1:gcr0kNtGBqin9zDW9GOHcVntrwnjrK+qdJ06mWYBybw= -github.com/ProtonMail/go-proton-api v0.4.1-0.20230704060229-a77a437ec052 h1:uIq0RX4gU9PSZ9x5b2LmJUXNOuBXRRVSOkM1RGnSy68= -github.com/ProtonMail/go-proton-api v0.4.1-0.20230704060229-a77a437ec052/go.mod h1:+aTJoYu8bqzGECXL2DOdiZTZ64bGn3w0NC8VcFpJrFM= +github.com/ProtonMail/go-proton-api v0.4.1-0.20230727082922-9115b4750ec7 h1:Rmg3TPK6vFGNWR4hxmPoBhV75Sl716iB46wEi2U4Q+c= +github.com/ProtonMail/go-proton-api v0.4.1-0.20230727082922-9115b4750ec7/go.mod h1:+aTJoYu8bqzGECXL2DOdiZTZ64bGn3w0NC8VcFpJrFM= github.com/ProtonMail/go-srp v0.0.7 h1:Sos3Qk+th4tQR64vsxGIxYpN3rdnG9Wf9K4ZloC1JrI= github.com/ProtonMail/go-srp v0.0.7/go.mod h1:giCp+7qRnMIcCvI6V6U3S1lDDXDQYx2ewJ6F/9wdlJk= github.com/ProtonMail/gopenpgp/v2 v2.7.1-proton h1:YS6M20yvjCJPR1r4ADW5TPn6rahs4iAyZaACei86bEc= diff --git a/internal/user/debug.go b/internal/user/debug.go index dbc73992..34b37d1d 100644 --- a/internal/user/debug.go +++ b/internal/user/debug.go @@ -131,7 +131,7 @@ func (apm DiagnosticMetadata) BuildMailboxToMessageMap(user *User) (map[string]A func (user *User) GetDiagnosticMetadata(ctx context.Context) (DiagnosticMetadata, error) { failedMessages := xmaps.SetFromSlice(user.vault.SyncStatus().FailedMessageIDs) - messageIDs, err := user.client.GetMessageIDs(ctx, "") + messageIDs, err := user.client.GetAllMessageIDs(ctx, "") if err != nil { return DiagnosticMetadata{}, err } diff --git a/internal/user/sync.go b/internal/user/sync.go index b0183392..717a677a 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -150,7 +150,7 @@ func (user *User) sync(ctx context.Context) error { user.log.Info("Syncing messages") // Determine which messages to sync. - messageIDs, err := user.client.GetMessageIDs(ctx, "") + messageIDs, err := getAllMessageIDs(ctx, user.client) if err != nil { return fmt.Errorf("failed to get message IDs to sync: %w", err) } @@ -198,6 +198,30 @@ func (user *User) sync(ctx context.Context) error { }) } +func getAllMessageIDs(ctx context.Context, client *proton.Client) ([]string, error) { + var messageIDs []string + + for afterID := ""; ; afterID = messageIDs[len(messageIDs)-1] { + for { + page, err := client.GetMessageIDs(ctx, afterID, 1000) + if err != nil { + if is429Error(err) { + continue + } + return nil, err + } + + if len(page) == 0 { + return messageIDs, nil + } + + messageIDs = append(messageIDs, page...) + + break + } + } +} + // nolint:exhaustive func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh ...*async.QueuedChannel[imap.Update]) error { var updates []imap.Update From d7b71aceda9c52f4c5ddbb586efa90ff307285f4 Mon Sep 17 00:00:00 2001 From: Jakub Date: Wed, 26 Jul 2023 16:11:58 +0200 Subject: [PATCH 05/15] fix(GODT-2822): retry 429 for metadata and exponential cooldown GODT-2823. --- internal/user/cooldown.go | 62 +++++++++++++++++++++++++++ internal/user/sync.go | 11 +++++ internal/user/sync_downloader.go | 6 +-- internal/user/sync_downloader_test.go | 13 +++--- internal/user/user.go | 11 +++-- 5 files changed, 87 insertions(+), 16 deletions(-) create mode 100644 internal/user/cooldown.go diff --git a/internal/user/cooldown.go b/internal/user/cooldown.go new file mode 100644 index 00000000..a0fd389a --- /dev/null +++ b/internal/user/cooldown.go @@ -0,0 +1,62 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package user + +import ( + "math/rand" + "time" +) + +const SyncRetryCooldown = 20 * time.Second + +type cooldownProvider interface { + GetNextWaitTime() time.Duration +} + +func jitter(max int) time.Duration { + return time.Duration(rand.Intn(max)) * time.Second //nolint:gosec +} + +type expCooldown struct { + count int +} + +func (c *expCooldown) GetNextWaitTime() time.Duration { + waitTimes := []time.Duration{ + 20 * time.Second, + 40 * time.Second, + 80 * time.Second, + 160 * time.Second, + 300 * time.Second, + 600 * time.Second, + } + + last := len(waitTimes) - 1 + + if c.count >= last { + return waitTimes[last] + jitter(10) + } + + c.count++ + + return waitTimes[c.count-1] + jitter(10) +} + +type noCooldown struct{} + +func (c *noCooldown) GetNextWaitTime() time.Duration { return time.Millisecond } diff --git a/internal/user/sync.go b/internal/user/sync.go index 717a677a..f224f1d4 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -439,7 +439,18 @@ func (user *User) syncMessages( metadataChunks := xslices.Chunk(messageIDs, MetadataDataPageSize) for i, metadataChunk := range metadataChunks { logrus.Debugf("Metadata Request (%v of %v), previous: %v", i, len(metadataChunks), len(downloadReq.ids)) + + cooldown := expCooldown{} + metadata, err := client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk}) + for is429Error(err) { + wait := cooldown.GetNextWaitTime() + logrus.WithError(err).Warnf("Retrying in %v to download message metadata for chunk %v", wait, i) + sleepCtx(ctx, wait) + + metadata, err = client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk}) + } + if err != nil { logrus.WithError(err).Errorf("Failed to download message metadata for chunk %v", i) downloadReq.err = err diff --git a/internal/user/sync_downloader.go b/internal/user/sync_downloader.go index e84450bb..0084ab73 100644 --- a/internal/user/sync_downloader.go +++ b/internal/user/sync_downloader.go @@ -103,7 +103,7 @@ func startSyncDownloader( return } - batch, err := downloadMessagesStage2(ctx, result, downloader, cache, SyncRetryCooldown) + batch, err := downloadMessagesStage2(ctx, result, downloader, cache, &expCooldown{}) if err != nil { errorCh <- err return @@ -281,7 +281,7 @@ func downloadMessagesStage2( state []downloadResult, downloader MessageDownloader, cache *SyncDownloadCache, - coolDown time.Duration, + cooldown cooldownProvider, ) ([]proton.FullMessage, error) { logrus.Debug("Entering download stage 2") var retryList []int @@ -289,7 +289,7 @@ func downloadMessagesStage2( for { if shouldWaitBeforeRetry { - time.Sleep(coolDown) + time.Sleep(cooldown.GetNextWaitTime()) } retryList = nil diff --git a/internal/user/sync_downloader_test.go b/internal/user/sync_downloader_test.go index c03d6061..76a9b6a4 100644 --- a/internal/user/sync_downloader_test.go +++ b/internal/user/sync_downloader_test.go @@ -23,7 +23,6 @@ import ( "io" "strings" "testing" - "time" "github.com/ProtonMail/gluon/async" "github.com/ProtonMail/go-proton-api" @@ -143,7 +142,7 @@ func TestSyncDownloader_Stage2_Everything200(t *testing.T) { }, } - result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.NoError(t, err) require.Equal(t, 2, len(result)) } @@ -171,7 +170,7 @@ func TestSyncDownloader_Stage2_Not429(t *testing.T) { }, } - _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Equal(t, msgErr, err) } @@ -195,7 +194,7 @@ func TestSyncDownloader_Stage2_API500(t *testing.T) { }, } - _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Equal(t, msgErr, err) } @@ -303,7 +302,7 @@ func TestSyncDownloader_Stage2_Some429(t *testing.T) { }) } - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.NoError(t, err) require.Equal(t, 3, len(messages)) @@ -381,7 +380,7 @@ func TestSyncDownloader_Stage2_ErrorOnNon429MessageDownload(t *testing.T) { messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).Times(1).Return(proton.Message{}, err500) } - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Empty(t, 0, messages) } @@ -424,7 +423,7 @@ func TestSyncDownloader_Stage2_ErrorOnNon429AttachmentDownload(t *testing.T) { // 500 for second attachment messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A4"), gomock.Any()).Times(1).Return(err500) - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, time.Millisecond) + messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Empty(t, 0, messages) } diff --git a/internal/user/user.go b/internal/user/user.go index 0399fba1..6ae06287 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -55,10 +55,6 @@ var ( EventJitter = 20 * time.Second // nolint:gochecknoglobals,revive ) -const ( - SyncRetryCooldown = 20 * time.Second -) - type User struct { log *logrus.Entry @@ -244,13 +240,16 @@ func New( return } + cooldown := expCooldown{} + for { if err := ctx.Err(); err != nil { user.log.WithError(err).Error("Sync aborted") return } else if err := user.doSync(ctx); err != nil { - user.log.WithError(err).Error("Failed to sync, will retry later") - sleepCtx(ctx, SyncRetryCooldown) + wait := cooldown.GetNextWaitTime() + user.log.WithField("retry-after", wait).WithError(err).Error("Failed to sync, will retry later") + sleepCtx(ctx, wait) } else { user.log.Info("Sync complete, starting API event stream") return From e26c7683d2b5b5b5cb305107f4117d666eb1a201 Mon Sep 17 00:00:00 2001 From: Jakub Date: Mon, 31 Jul 2023 13:52:39 +0200 Subject: [PATCH 06/15] fix(GODT-2822): event loop behaviour on 429. --- internal/user/sync_downloader.go | 9 +++++++++ internal/user/user.go | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/user/sync_downloader.go b/internal/user/sync_downloader.go index 0084ab73..00d99be1 100644 --- a/internal/user/sync_downloader.go +++ b/internal/user/sync_downloader.go @@ -379,3 +379,12 @@ func is429Error(err error) bool { return false } + +func is429Or5XXError(err error) bool { + var apiErr *proton.APIError + if errors.As(err, &apiErr) { + return apiErr.Status == 429 || apiErr.Status >= 500 + } + + return false +} diff --git a/internal/user/user.go b/internal/user/user.go index 6ae06287..1c70c9f6 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -778,7 +778,7 @@ func (user *User) doEventPoll(ctx context.Context) error { } // If the error is a server-side issue, return error to retry later. - if apiErr := new(proton.APIError); errors.As(err, &apiErr) && apiErr.Status >= 500 { + if is429Or5XXError(err) { return fmt.Errorf("failed to handle event due to server error: %w", err) } From 75811d22e8787a0dbe321a24d7e5c397a8050ba8 Mon Sep 17 00:00:00 2001 From: Jakub Date: Mon, 31 Jul 2023 15:21:53 +0200 Subject: [PATCH 07/15] fix(GODT-2822): rename funcs --- internal/user/sync_downloader.go | 8 ++++---- internal/user/sync_downloader_test.go | 20 ++++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/internal/user/sync_downloader.go b/internal/user/sync_downloader.go index 00d99be1..0c6ad6a6 100644 --- a/internal/user/sync_downloader.go +++ b/internal/user/sync_downloader.go @@ -92,7 +92,7 @@ func startSyncDownloader( return } - result, err := downloadMessageStage1(ctx, panicHandler, request, downloader, attachmentDownloader, cache, syncLimits.MaxParallelDownloads) + result, err := downloadMessagesParallel(ctx, panicHandler, request, downloader, attachmentDownloader, cache, syncLimits.MaxParallelDownloads) if err != nil { errorCh <- err return @@ -103,7 +103,7 @@ func startSyncDownloader( return } - batch, err := downloadMessagesStage2(ctx, result, downloader, cache, &expCooldown{}) + batch, err := downloadMessagesSequential(ctx, result, downloader, cache, &expCooldown{}) if err != nil { errorCh <- err return @@ -229,7 +229,7 @@ func (a *attachmentDownloader) close() { a.cancel() } -func downloadMessageStage1( +func downloadMessagesParallel( ctx context.Context, panicHandler async.PanicHandler, request downloadRequest, @@ -276,7 +276,7 @@ func downloadMessageStage1( }) } -func downloadMessagesStage2( +func downloadMessagesSequential( ctx context.Context, state []downloadResult, downloader MessageDownloader, diff --git a/internal/user/sync_downloader_test.go b/internal/user/sync_downloader_test.go index 76a9b6a4..c926e0d1 100644 --- a/internal/user/sync_downloader_test.go +++ b/internal/user/sync_downloader_test.go @@ -31,7 +31,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestSyncDownloader_Stage1_429(t *testing.T) { +func TestSyncDownloader_Parallel_429(t *testing.T) { // Check 429 is correctly caught and download state recorded correctly // Message 1: All ok // Message 2: Message failed @@ -92,7 +92,7 @@ func TestSyncDownloader_Stage1_429(t *testing.T) { attachmentDownloader := newAttachmentDownloader(ctx, panicHandler, messageDownloader, cache, 1) defer attachmentDownloader.close() - result, err := downloadMessageStage1(ctx, panicHandler, requests, messageDownloader, attachmentDownloader, cache, 1) + result, err := downloadMessagesParallel(ctx, panicHandler, requests, messageDownloader, attachmentDownloader, cache, 1) require.NoError(t, err) require.Equal(t, 3, len(result)) // Check message 1 @@ -142,7 +142,7 @@ func TestSyncDownloader_Stage2_Everything200(t *testing.T) { }, } - result, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) + result, err := downloadMessagesSequential(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.NoError(t, err) require.Equal(t, 2, len(result)) } @@ -170,7 +170,7 @@ func TestSyncDownloader_Stage2_Not429(t *testing.T) { }, } - _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) + _, err := downloadMessagesSequential(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Equal(t, msgErr, err) } @@ -194,7 +194,7 @@ func TestSyncDownloader_Stage2_API500(t *testing.T) { }, } - _, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) + _, err := downloadMessagesSequential(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Equal(t, msgErr, err) } @@ -302,7 +302,7 @@ func TestSyncDownloader_Stage2_Some429(t *testing.T) { }) } - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) + messages, err := downloadMessagesSequential(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.NoError(t, err) require.Equal(t, 3, len(messages)) @@ -380,7 +380,7 @@ func TestSyncDownloader_Stage2_ErrorOnNon429MessageDownload(t *testing.T) { messageDownloader.EXPECT().GetMessage(gomock.Any(), gomock.Eq("Msg3")).Times(1).Return(proton.Message{}, err500) } - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) + messages, err := downloadMessagesSequential(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Empty(t, 0, messages) } @@ -423,12 +423,12 @@ func TestSyncDownloader_Stage2_ErrorOnNon429AttachmentDownload(t *testing.T) { // 500 for second attachment messageDownloader.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("A4"), gomock.Any()).Times(1).Return(err500) - messages, err := downloadMessagesStage2(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) + messages, err := downloadMessagesSequential(ctx, downloadResult, messageDownloader, cache, &noCooldown{}) require.Error(t, err) require.Empty(t, 0, messages) } -func TestSyncDownloader_Stage1_DoNotDownloadIfAlreadyInCache(t *testing.T) { +func TestSyncDownloader_Parallel_DoNotDownloadIfAlreadyInCache(t *testing.T) { mockCtrl := gomock.NewController(t) messageDownloader := mocks.NewMockMessageDownloader(mockCtrl) panicHandler := &async.NoopPanicHandler{} @@ -452,7 +452,7 @@ func TestSyncDownloader_Stage1_DoNotDownloadIfAlreadyInCache(t *testing.T) { cache.StoreAttachment("A1", []byte(attachmentData)) cache.StoreAttachment("A2", []byte(attachmentData)) - result, err := downloadMessageStage1(ctx, panicHandler, requests, messageDownloader, attachmentDownloader, cache, 1) + result, err := downloadMessagesParallel(ctx, panicHandler, requests, messageDownloader, attachmentDownloader, cache, 1) require.NoError(t, err) require.Equal(t, 2, len(result)) From 84adbbc461827f26bdcf884a2bea2b10309affbf Mon Sep 17 00:00:00 2001 From: Jakub Date: Tue, 1 Aug 2023 07:54:57 +0200 Subject: [PATCH 08/15] chore: Trift Bridge 3.4.0 changelog. --- Changelog.md | 37 ++++--------------------------------- 1 file changed, 4 insertions(+), 33 deletions(-) diff --git a/Changelog.md b/Changelog.md index 578da66a..756db9a5 100644 --- a/Changelog.md +++ b/Changelog.md @@ -5,6 +5,8 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/) ## Trift Bridge 3.4.0 +### Added + ### Changed * Test: Add require.Eventually to TestBridge_UserAgentFromSMTPClient. * Test: Add smtp-send utility. @@ -12,32 +14,14 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/) * GODT-2759: Add prompt to download missing messages for analysis. * GODT-2759: CLI debug commands. * Remove gRPC auto-generated C++ source files. -* GODT-2709: Remove the config status file when user is removed. -* GODT-2749: Manual test-windows again. -* GODT-2712: Feed config_status with user action while pending. -* GODT-2715: Add Unitary test for configStatus event. -* GODT-2715: Add Functional test for configStatus telemetry event. -* GODT-2714: Apply PR comments. -* GODT-2714: Set Configuration Status to Failure and send Recovery event when issue is solved. -* GODT-2713: Send config_progress event once a day if the configuration is stucked in pending for more than a day. -* GODT-2711: Send config_abort event on User removal. -* GODT-2710: Send config success on IMAP/SMTP connection.. -* GODT-2716: Make Configuration Statistics persistent. -* GODT-2709: Init Configuration status. -* GODT-2750: Disable raise on main window when a notification is clicked on Linux. -* GODT-2748: Log calls that cause main window to show, with reason. * Test: Force all unit test to use minimum sync spec. * Test: Force sync limits to minimum with env variable. -* GODT-2749: Manual windows-test. * GODT-2691: Close logrus output file on exit. * GODT-2522: New Gluon database layout. -* GODT-2728: Remove the sentry report for gRPC event stream interruptions in bridge-gui. * GODT-2678: When internet is off, do not display status dot icon for the user in the context menu. * GODT-2686: Change the orientation of the expand/collapse arrow for Advanced settings. * Test(GODT-2636): Add step for sending from EML. -* GODT-2707: Set bridge-gui default log level to 'debug'. * Log failed message ids during sync. -* GODT-2705: Added log entries for focus service on client and server sides. * GODT-2510: Remove Ent. * Test(GODT-2600): Changing state (read/unread, starred/unstarred) of a message in integration tests. * GODT-2703: Got rid of account details dialog with Apple Mail autoconf. @@ -45,35 +29,22 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/) * GODT-2690: Update sentry reporting in GUI for new log file naming. * GODT-2668: Implemented new log retention policy. * Test(GODT-2683): Save Draft without "Date" & "From" in headers. -* Merge branch release/stone to devel. * GODT-2666: Feat(GODT-2667): introduce sessionID in bridge. -* Fix linter errors. -* GODT-2653: Log API error details on Message import and send. -* GODT-2674: Add more logs to failed update. * GODT-2660: Calculate bridge coverage and refactor CI yaml file. -* GODT-2674: Add more logs during update failed. * Fix dependency_license script to handle dot formated version. -* Add error logs when messages fail to build during sync. -* GODT-2673: Use NoClient as UserAgent without any client connected and... -* GODT-2655: Display internal build time tag in log and GUI. ### Fixed +* GODT-2812: Fix rare sync deadlock. +* GODT-2822: Better handling 429 during sync and event loop. * GODT-2763: Missing Answered flag on Sync and Message Create. -* GODT-2774: Only check telemetry availability for the current user. -* GODT-2774: Add external context to telemetry tasks. * GODT-2758: Fix panic in SetFlagsOnMessages. -* GODT-2708: Fix dimensions event format + handling of ReportClicked event. * GODT-2578: Refresh literals appended to Sent folder. -* GODT-2756: Fix for 'Settings' context menu opening the 'Help' page. * GODT-2753: Vault test now check that value auto-assigned is first available port. * GODT-2522: Handle migration with unreferenced db values. * GODT-2693: Allow missing whitespace after header field colon. -* GODT-2726: Fix Parsing of Details field in GPA error message. * GODT-2653: Only log when err is not nil. * GODT-2680: Fix for C++ debugger not working on ARM64 because of OpenSSL 3.1. * GODT-2675: Update GPA to applye togin-gonic/gin patch + update COPYING_NOTES. -* GODT-2672: Fix context cancelled when IMAP/SMTP parameters change is in progress. -* GODT-2669: Display sentry ID in bridge init log. ## Stone Bridge 3.3.2 From a9865976a38f4cf49c924790e3a2aaa97eaa2235 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Tue, 8 Aug 2023 09:02:47 +0200 Subject: [PATCH 09/15] fix(GODT-2759): Use examine rather than select for fetching When fetching messages in the debug mailbox state command, use read only mode to avoid modifying the mailbox state. --- go.mod | 2 +- go.sum | 3 +++ internal/bridge/debug.go | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 75e25390..77dd2d8c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/0xAX/notificator v0.0.0-20220220101646-ee9b8921e557 github.com/Masterminds/semver/v3 v3.2.0 - github.com/ProtonMail/gluon v0.16.1-0.20230706110757-a9327fb18611 + github.com/ProtonMail/gluon v0.16.1-0.20230808074632-9b48ebb293ec github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a github.com/ProtonMail/go-proton-api v0.4.1-0.20230727082922-9115b4750ec7 github.com/ProtonMail/gopenpgp/v2 v2.7.1-proton diff --git a/go.sum b/go.sum index 2b015f2b..f7f90240 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/ProtonMail/docker-credential-helpers v1.1.0 h1:+kvUIpwWcbtP3WFv5sSvkF github.com/ProtonMail/docker-credential-helpers v1.1.0/go.mod h1:mK0aBveCxhnQ756AmaTfXMZDeULvheYVhF/MWMErN5g= github.com/ProtonMail/gluon v0.16.1-0.20230706110757-a9327fb18611 h1:QVydPr/+pgz5xihc2ujNNV+qnq3oTidIXvF0PgkcY6U= github.com/ProtonMail/gluon v0.16.1-0.20230706110757-a9327fb18611/go.mod h1:Og5/Dz1MiGpCJn51XujZwxiLG7WzvvjE5PRpZBQmAHo= +github.com/ProtonMail/gluon v0.16.1-0.20230808074632-9b48ebb293ec h1:34Uwv/5hCGtrWes3wKaR0MkqVRd0wb4sFZDqdu22Xzs= +github.com/ProtonMail/gluon v0.16.1-0.20230808074632-9b48ebb293ec/go.mod h1:Og5/Dz1MiGpCJn51XujZwxiLG7WzvvjE5PRpZBQmAHo= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a h1:D+aZah+k14Gn6kmL7eKxoo/4Dr/lK3ChBcwce2+SQP4= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a/go.mod h1:oTGdE7/DlWIr23G0IKW3OXK9wZ5Hw1GGiaJFccTvZi4= github.com/ProtonMail/go-crypto v0.0.0-20230321155629-9a39f2531310/go.mod h1:8TI4H3IbrackdNgv+92dI+rhpCaLqM0IfpgCgenFvRE= @@ -266,6 +268,7 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/internal/bridge/debug.go b/internal/bridge/debug.go index 26f5532d..9f811467 100644 --- a/internal/bridge/debug.go +++ b/internal/bridge/debug.go @@ -230,7 +230,7 @@ func (bridge *Bridge) DebugDownloadFailedMessages( } func clientGetMessageIDs(client *goimapclient.Client, mailbox string) (map[string]imap.FlagSet, error) { - status, err := client.Select(mailbox, false) + status, err := client.Select(mailbox, true) if err != nil { return nil, err } From c35344d6f1aee66cfb9140058b50c376a3dd7196 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Tue, 8 Aug 2023 12:36:19 +0200 Subject: [PATCH 10/15] fix(GODT-2833): Fix migration of message flags Migration of message flags was incomplete, leading to incorrect state after migration. https://github.com/ProtonMail/gluon/pull/388 --- go.mod | 2 +- go.sum | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 77dd2d8c..5cdf8ab2 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/0xAX/notificator v0.0.0-20220220101646-ee9b8921e557 github.com/Masterminds/semver/v3 v3.2.0 - github.com/ProtonMail/gluon v0.16.1-0.20230808074632-9b48ebb293ec + github.com/ProtonMail/gluon v0.16.1-0.20230808094407-85a10f17ae92 github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a github.com/ProtonMail/go-proton-api v0.4.1-0.20230727082922-9115b4750ec7 github.com/ProtonMail/gopenpgp/v2 v2.7.1-proton diff --git a/go.sum b/go.sum index f7f90240..69f79877 100644 --- a/go.sum +++ b/go.sum @@ -23,10 +23,8 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo= github.com/ProtonMail/docker-credential-helpers v1.1.0 h1:+kvUIpwWcbtP3WFv5sSvkFn/XLzSqPOB5AAthuk9xPk= github.com/ProtonMail/docker-credential-helpers v1.1.0/go.mod h1:mK0aBveCxhnQ756AmaTfXMZDeULvheYVhF/MWMErN5g= -github.com/ProtonMail/gluon v0.16.1-0.20230706110757-a9327fb18611 h1:QVydPr/+pgz5xihc2ujNNV+qnq3oTidIXvF0PgkcY6U= -github.com/ProtonMail/gluon v0.16.1-0.20230706110757-a9327fb18611/go.mod h1:Og5/Dz1MiGpCJn51XujZwxiLG7WzvvjE5PRpZBQmAHo= -github.com/ProtonMail/gluon v0.16.1-0.20230808074632-9b48ebb293ec h1:34Uwv/5hCGtrWes3wKaR0MkqVRd0wb4sFZDqdu22Xzs= -github.com/ProtonMail/gluon v0.16.1-0.20230808074632-9b48ebb293ec/go.mod h1:Og5/Dz1MiGpCJn51XujZwxiLG7WzvvjE5PRpZBQmAHo= +github.com/ProtonMail/gluon v0.16.1-0.20230808094407-85a10f17ae92 h1:yoaUatxdB6EXChiWdfIBpasJJxrQ6dHJknG0hwBAqmQ= +github.com/ProtonMail/gluon v0.16.1-0.20230808094407-85a10f17ae92/go.mod h1:Og5/Dz1MiGpCJn51XujZwxiLG7WzvvjE5PRpZBQmAHo= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a h1:D+aZah+k14Gn6kmL7eKxoo/4Dr/lK3ChBcwce2+SQP4= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a/go.mod h1:oTGdE7/DlWIr23G0IKW3OXK9wZ5Hw1GGiaJFccTvZi4= github.com/ProtonMail/go-crypto v0.0.0-20230321155629-9a39f2531310/go.mod h1:8TI4H3IbrackdNgv+92dI+rhpCaLqM0IfpgCgenFvRE= @@ -268,7 +266,6 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= From e6ab874308d1e08ac16af105c0361cd38a3fb342 Mon Sep 17 00:00:00 2001 From: Jakub Date: Tue, 8 Aug 2023 15:06:16 +0200 Subject: [PATCH 11/15] chore: Trift Bridge 3.4.1 changelog. --- Changelog.md | 7 +++++++ Makefile | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/Changelog.md b/Changelog.md index 756db9a5..f968d2a5 100644 --- a/Changelog.md +++ b/Changelog.md @@ -3,6 +3,13 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/) +## Trift Bridge 3.4.1 + +### Fixed +* GODT-2833: Fix migration of message flags. +* GODT-2759: Use examine rather than select for fetching. + + ## Trift Bridge 3.4.0 ### Added diff --git a/Makefile b/Makefile index fd200737..e61b1f7d 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST)))) .PHONY: build build-gui build-nogui build-launcher versioner hasher # Keep version hardcoded so app build works also without Git repository. -BRIDGE_APP_VERSION?=3.4.0+git +BRIDGE_APP_VERSION?=3.4.1+git APP_VERSION:=${BRIDGE_APP_VERSION} APP_FULL_NAME:=Proton Mail Bridge APP_VENDOR:=Proton AG From 5ca9ec6674cd8ed01b52c1a570c0fcdcf451b9c9 Mon Sep 17 00:00:00 2001 From: Romain LE JEUNE Date: Wed, 9 Aug 2023 17:28:28 +0200 Subject: [PATCH 12/15] fix(GODT-2859): Trigger user resync while updating from 3.4.0 to 3.4.1. --- internal/bridge/migration.go | 54 ++++++++++++++++++++++++++++++++++++ internal/bridge/user.go | 3 ++ 2 files changed, 57 insertions(+) create mode 100644 internal/bridge/migration.go diff --git a/internal/bridge/migration.go b/internal/bridge/migration.go new file mode 100644 index 00000000..975ae890 --- /dev/null +++ b/internal/bridge/migration.go @@ -0,0 +1,54 @@ +// Copyright (c) 2023 Proton AG +// +// This file is part of Proton Mail Bridge. +// +// Proton Mail Bridge is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Proton Mail Bridge is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Proton Mail Bridge. If not, see . + +package bridge + +import ( + "github.com/ProtonMail/gluon/reporter" + "github.com/ProtonMail/proton-bridge/v3/internal/vault" + "github.com/sirupsen/logrus" + "strings" +) + +func (bridge *Bridge) databaseResyncNeeded() bool { + if strings.HasPrefix(bridge.lastVersion.String(), "3.4.0") && + strings.HasPrefix(bridge.curVersion.String(), "3.4.1") { + logrus.WithFields(logrus.Fields{ + "lastVersion": bridge.lastVersion.String(), + "currVersion": bridge.curVersion.String(), + }).Warning("Database re-synchronisation needed") + return true + } + return false +} + +func (bridge *Bridge) TryMigrate(vault *vault.User) { + if bridge.databaseResyncNeeded() { + if err := bridge.reporter.ReportMessage("Database need to be re-sync for migration."); err != nil { + logrus.WithError(err).Error("Failed to report database re-sync for migration.") + } + if err := vault.ClearSyncStatus(); err != nil { + logrus.WithError(err).Error("Failed reset to SyncStatus.") + if err2 := bridge.reporter.ReportMessageWithContext("Failed to reset SyncStatus for Database migration.", + reporter.Context{ + "error": err, + }); err2 != nil { + logrus.WithError(err2).Error("Failed to report reset SyncStatus error.") + } + } + } +} diff --git a/internal/bridge/user.go b/internal/bridge/user.go index 000e5ccd..93af1643 100644 --- a/internal/bridge/user.go +++ b/internal/bridge/user.go @@ -524,6 +524,9 @@ func (bridge *Bridge) addUserWithVault( return fmt.Errorf("failed to get Statistics directory: %w", err) } + // re-set SyncStatus if database need to be re-sync for migration. + bridge.TryMigrate(vault) + user, err := user.New( ctx, vault, From b24937b666049ff0f109347cca0e47a1ff90f85a Mon Sep 17 00:00:00 2001 From: Romain LE JEUNE Date: Thu, 10 Aug 2023 08:30:46 +0200 Subject: [PATCH 13/15] fix(GODT-2859): Fix lint. --- internal/bridge/migration.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/bridge/migration.go b/internal/bridge/migration.go index 975ae890..a370ca93 100644 --- a/internal/bridge/migration.go +++ b/internal/bridge/migration.go @@ -18,10 +18,11 @@ package bridge import ( + "strings" + "github.com/ProtonMail/gluon/reporter" "github.com/ProtonMail/proton-bridge/v3/internal/vault" "github.com/sirupsen/logrus" - "strings" ) func (bridge *Bridge) databaseResyncNeeded() bool { From d72980e443ea2aeb589564cbdfe15cfde87721d9 Mon Sep 17 00:00:00 2001 From: Romain LE JEUNE Date: Thu, 10 Aug 2023 08:32:54 +0200 Subject: [PATCH 14/15] fix(GODT-2859): Fix scope of the function + rename properly. --- internal/bridge/migration.go | 2 +- internal/bridge/user.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/bridge/migration.go b/internal/bridge/migration.go index a370ca93..e0a1d630 100644 --- a/internal/bridge/migration.go +++ b/internal/bridge/migration.go @@ -37,7 +37,7 @@ func (bridge *Bridge) databaseResyncNeeded() bool { return false } -func (bridge *Bridge) TryMigrate(vault *vault.User) { +func (bridge *Bridge) migrateUser(vault *vault.User) { if bridge.databaseResyncNeeded() { if err := bridge.reporter.ReportMessage("Database need to be re-sync for migration."); err != nil { logrus.WithError(err).Error("Failed to report database re-sync for migration.") diff --git a/internal/bridge/user.go b/internal/bridge/user.go index 93af1643..d60aead6 100644 --- a/internal/bridge/user.go +++ b/internal/bridge/user.go @@ -524,8 +524,8 @@ func (bridge *Bridge) addUserWithVault( return fmt.Errorf("failed to get Statistics directory: %w", err) } - // re-set SyncStatus if database need to be re-sync for migration. - bridge.TryMigrate(vault) + // re-set SyncStatus if database need to be re-synced for migration. + bridge.migrateUser(vault) user, err := user.New( ctx, From 0a555bf7670baf9c4be631de9ea4c6bd05b22e8e Mon Sep 17 00:00:00 2001 From: Jakub Date: Mon, 14 Aug 2023 08:33:51 +0200 Subject: [PATCH 15/15] chore: Trift Bridge 3.4.1 changelog. --- Changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/Changelog.md b/Changelog.md index f968d2a5..e635f946 100644 --- a/Changelog.md +++ b/Changelog.md @@ -6,6 +6,7 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/) ## Trift Bridge 3.4.1 ### Fixed +* GODT-2859: Trigger user resync while updating from 3.4.0 to 3.4.1. * GODT-2833: Fix migration of message flags. * GODT-2759: Use examine rather than select for fetching.