fix(GODT-2829): Fix new sync service bugs

* Fix wrong context use for message downloads
* Fix delete of sync data failing due ErrNotFound
* Pre-allocate attachment data buffer before download
* Fix calculation of progress if message count is higher than total
This commit is contained in:
Leander Beernaert
2023-08-28 11:23:57 +02:00
parent 1fa0d77b10
commit 0b9b886039
11 changed files with 118 additions and 55 deletions

View File

@ -57,10 +57,20 @@ func (rep *syncReporter) OnError(ctx context.Context, err error) {
func (rep *syncReporter) OnProgress(ctx context.Context, delta int64) {
rep.count += delta
var progress float64
// It's possible for count to be bigger or smaller than total depending on when the sync begins and whether new
// messages are added/removed during this period. When this happens just limited the progress to 100%.
if rep.count > rep.total {
progress = 1
} else {
progress = float64(rep.count) / float64(rep.total)
}
if time.Since(rep.last) > rep.freq {
rep.eventPublisher.PublishEvent(ctx, events.SyncProgress{
UserID: rep.userID,
Progress: float64(rep.count) / float64(rep.total),
Progress: progress,
Elapsed: time.Since(rep.start),
Remaining: time.Since(rep.start) * time.Duration(rep.total-(rep.count+1)) / time.Duration(rep.count+1),
})

View File

@ -222,7 +222,11 @@ func (s *SyncState) loadUnsafe() error {
func DeleteSyncState(configDir, userID string) error {
path := getSyncConfigPath(configDir, userID)
return os.Remove(path)
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
return nil
}
func MigrateVaultSettings(

View File

@ -359,33 +359,33 @@ func (sm *Service) handleAddIMAPUserImpl(ctx context.Context,
} else if isNew {
panic("IMAP user should already have a database")
}
}
status, err := syncStateProvider.GetSyncStatus(ctx)
if err != nil {
return fmt.Errorf("failed to get sync status: %w", err)
}
if !status.HasLabels {
// Otherwise, the DB already exists -- if the labels are not yet synced, we need to re-create the DB.
if err := sm.imapServer.RemoveUser(ctx, gluonID, true); err != nil {
return fmt.Errorf("failed to remove old IMAP user: %w", err)
}
if err := idProvider.RemoveGluonID(addrID, gluonID); err != nil {
return fmt.Errorf("failed to remove old IMAP user ID: %w", err)
}
gluonID, err := sm.imapServer.AddUser(ctx, connector, idProvider.GluonKey())
} else {
status, err := syncStateProvider.GetSyncStatus(ctx)
if err != nil {
return fmt.Errorf("failed to add IMAP user: %w", err)
return fmt.Errorf("failed to get sync status: %w", err)
}
if err := idProvider.SetGluonID(addrID, gluonID); err != nil {
return fmt.Errorf("failed to set IMAP user ID: %w", err)
}
if !status.HasLabels {
// Otherwise, the DB already exists -- if the labels are not yet synced, we need to re-create the DB.
if err := sm.imapServer.RemoveUser(ctx, gluonID, true); err != nil {
return fmt.Errorf("failed to remove old IMAP user: %w", err)
}
log.WithField("gluonID", gluonID).Info("Re-created IMAP user")
if err := idProvider.RemoveGluonID(addrID, gluonID); err != nil {
return fmt.Errorf("failed to remove old IMAP user ID: %w", err)
}
gluonID, err := sm.imapServer.AddUser(ctx, connector, idProvider.GluonKey())
if err != nil {
return fmt.Errorf("failed to add IMAP user: %w", err)
}
if err := idProvider.SetGluonID(addrID, gluonID); err != nil {
return fmt.Errorf("failed to set IMAP user ID: %w", err)
}
log.WithField("gluonID", gluonID).Info("Re-created IMAP user")
}
}
} else {
log.Info("Creating new IMAP user")

View File

@ -58,7 +58,7 @@ func newSyncLimits(maxSyncMemory uint64) syncLimits {
MinMessageBuildingMem: 64 * Megabyte,
// Maximum recommend value for parallel downloads by the API team.
MaxParallelDownloads: 20,
MaxParallelDownloads: 32,
MaxSyncMemory: maxSyncMemory,
}

View File

@ -49,7 +49,7 @@ func NewService(reporter reporter.Reporter,
return &Service{
limits: limits,
metadataStage: NewMetadataStage(metaCh, downloadCh, limits.DownloadRequestMem, panicHandler),
downloadStage: NewDownloadStage(downloadCh, buildCh, 20, panicHandler),
downloadStage: NewDownloadStage(downloadCh, buildCh, limits.MaxParallelDownloads, panicHandler),
buildStage: NewBuildStage(buildCh, applyCh, limits.MessageBuildMem, panicHandler, reporter),
applyStage: NewApplyStage(applyCh),
metaCh: metaCh,

View File

@ -76,7 +76,7 @@ func (a *ApplyStage) run(ctx context.Context) {
continue
}
if err := req.job.updateApplier.ApplySyncUpdates(ctx, req.messages); err != nil {
if err := req.job.updateApplier.ApplySyncUpdates(req.getContext(), req.messages); err != nil {
a.log.WithError(err).Error("Failed to apply sync updates")
req.job.onError(err)
continue

View File

@ -18,6 +18,7 @@
package syncservice
import (
"bytes"
"context"
"errors"
"sync/atomic"
@ -58,7 +59,7 @@ func NewDownloadStage(
return &DownloadStage{
input: input,
output: output,
maxParallelDownloads: maxParallelDownloads,
maxParallelDownloads: maxParallelDownloads * 2,
panicHandler: panicHandler,
log: logrus.WithField("sync-stage", "download"),
}
@ -94,7 +95,7 @@ func (d *DownloadStage) run(ctx context.Context) {
// Step 1: Download Messages.
result, err := autoDownloadRate(
ctx,
request.getContext(),
&DefaultDownloadRateModifier{},
request.job.client,
d.maxParallelDownloads,
@ -155,14 +156,15 @@ func (d *DownloadStage) run(ctx context.Context) {
// Step 3: Download attachments data to the message.
attachments, err := autoDownloadRate(
ctx,
request.getContext(),
&DefaultDownloadRateModifier{},
request.job.client,
d.maxParallelDownloads,
attachmentIndices,
newCoolDown,
func(ctx context.Context, client APIClient, input attachmentMeta) ([]byte, error) {
return downloadAttachment(ctx, request.job.downloadCache, client, result[input.msgIdx].Attachments[input.attIdx].ID)
attachment := result[input.msgIdx].Attachments[input.attIdx]
return downloadAttachment(ctx, request.job.downloadCache, client, attachment.ID, attachment.Size)
},
)
if err != nil {
@ -202,17 +204,22 @@ func downloadMessage(ctx context.Context, cache *DownloadCache, client APIClient
return msg, nil
}
func downloadAttachment(ctx context.Context, cache *DownloadCache, client APIClient, id string) ([]byte, error) {
func downloadAttachment(ctx context.Context, cache *DownloadCache, client APIClient, id string, size int64) ([]byte, error) {
data, ok := cache.GetAttachment(id)
if ok {
return data, nil
}
data, err := client.GetAttachment(ctx, id)
if err != nil {
var buffer bytes.Buffer
buffer.Grow(int(size))
if err := client.GetAttachmentInto(ctx, id, &buffer); err != nil {
return nil, err
}
data = buffer.Bytes()
cache.StoreAttachment(id, data)
return data, nil
@ -236,6 +243,10 @@ func autoDownloadRate[T any, R any](
proton429or5xxCounter := int32(0)
parallelTasks := maxParallelDownloads
for _, chunk := range xslices.Chunk(data, maxParallelDownloads) {
if err := ctx.Err(); err != nil {
return nil, err
}
parallelTasks = modifier.Apply(atomic.LoadInt32(&proton429or5xxCounter) != 0, parallelTasks, maxParallelDownloads)
atomic.StoreInt32(&proton429or5xxCounter, 0)

View File

@ -18,6 +18,7 @@
package syncservice
import (
"bytes"
"context"
"errors"
"fmt"
@ -61,9 +62,9 @@ func TestDownloadAttachment_NotInCache(t *testing.T) {
mockCtrl := gomock.NewController(t)
client := NewMockAPIClient(mockCtrl)
cache := newDownloadCache()
client.EXPECT().GetAttachment(gomock.Any(), gomock.Any()).Return(nil, nil)
client.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
_, err := downloadAttachment(context.Background(), cache, client, "id")
_, err := downloadAttachment(context.Background(), cache, client, "id", 1024)
require.NoError(t, err)
}
@ -74,7 +75,7 @@ func TestDownloadAttachment_InCache(t *testing.T) {
attachment := []byte("hello world")
cache.StoreAttachment("id", attachment)
downloaded, err := downloadAttachment(context.Background(), cache, client, "id")
downloaded, err := downloadAttachment(context.Background(), cache, client, "id", 1024)
require.NoError(t, err)
require.Equal(t, attachment, downloaded)
}
@ -343,7 +344,7 @@ func TestDownloadStage_JobAbortsOnAttachmentDownloadError(t *testing.T) {
ID: "attach",
}},
}, nil)
tj.client.EXPECT().GetAttachment(gomock.Any(), gomock.Eq("attach")).Return(nil, expectedErr)
tj.client.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq("attach"), gomock.Any()).Return(expectedErr)
tj.job.begin()
childJob := tj.job.newChildJob("f", 10)
@ -403,7 +404,13 @@ func buildDownloadStageData(tj *tjob, numMessages int, with422 bool) ([]string,
tj.client.EXPECT().GetMessage(gomock.Any(), gomock.Eq(m.ID)).Return(m.Message, nil)
for idx, a := range m.Attachments {
tj.client.EXPECT().GetAttachment(gomock.Any(), gomock.Eq(a.ID)).Return(m.AttData[idx], nil)
attData := m.AttData[idx]
tj.client.EXPECT().GetAttachmentInto(gomock.Any(), gomock.Eq(a.ID), gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, b *bytes.Buffer) error {
_, err := b.Write(attData)
return err
},
)
}
}

View File

@ -57,8 +57,8 @@ func NewMetadataStage(
}
}
const MetadataPageSize = 150
const MetadataMaxMessages = 250
const MetadataPageSize = 128
const MetadataMaxMessages = 64
func (m *MetadataStage) Run(group *async.Group) {
group.Once(func(ctx context.Context) {

View File

@ -0,0 +1,42 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package user
import (
"fmt"
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
"github.com/ProtonMail/proton-bridge/v3/internal/vault"
)
func migrateSyncStatusFromVault(encVault *vault.User, syncConfigDir string, userID string) error {
syncStatus := encVault.SyncStatus()
migrated, err := imapservice.MigrateVaultSettings(syncConfigDir, userID, syncStatus.HasLabels, syncStatus.HasMessages, syncStatus.FailedMessageIDs)
if err != nil {
return fmt.Errorf("failed to migrate user sync settings: %w", err)
}
if migrated {
if err := encVault.ClearSyncStatus(); err != nil {
return fmt.Errorf("failed to clear sync settings from vault: %w", err)
}
}
return nil
}

View File

@ -156,19 +156,8 @@ func newImpl(
logrus.WithField("userID", apiUser.ID).Info("Creating new user")
// Migrate Sync Status from Vault.
{
syncStatus := encVault.SyncStatus()
migrated, err := imapservice.MigrateVaultSettings(syncConfigDir, apiUser.ID, syncStatus.HasLabels, syncStatus.HasMessages, syncStatus.FailedMessageIDs)
if err != nil {
return nil, fmt.Errorf("failed to migrate user sync settings: %w", err)
}
if migrated {
if err := encVault.ClearSyncStatus(); err != nil {
return nil, fmt.Errorf("failed to clear sync settings from vault: %w", err)
}
}
if err := migrateSyncStatusFromVault(encVault, syncConfigDir, apiUser.ID); err != nil {
return nil, err
}
// Get the user's API addresses.