feat(GODT-2829): Integrate new sync service

Update imap service to use the new sync service.

The new sync state is stored as simple file on disk to avoid contention
with concurrent vault writes.
This commit is contained in:
Leander Beernaert
2023-08-25 15:03:41 +02:00
parent aa77a67a1c
commit efbe84964f
27 changed files with 864 additions and 1176 deletions

1
go.sum
View File

@ -266,7 +266,6 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=

View File

@ -41,6 +41,7 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/safe"
"github.com/ProtonMail/proton-bridge/v3/internal/sentry"
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapsmtpserver"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
"github.com/ProtonMail/proton-bridge/v3/internal/telemetry"
"github.com/ProtonMail/proton-bridge/v3/internal/user"
"github.com/ProtonMail/proton-bridge/v3/internal/vault"
@ -127,6 +128,7 @@ type Bridge struct {
goHeartbeat func()
serverManager *imapsmtpserver.Service
syncService *syncservice.Service
}
// New creates a new bridge.
@ -268,7 +270,8 @@ func newBridge(
firstStart: firstStart,
lastVersion: lastVersion,
tasks: tasks,
tasks: tasks,
syncService: syncservice.NewService(reporter, panicHandler),
}
bridge.serverManager = imapsmtpserver.NewService(context.Background(),
@ -285,6 +288,8 @@ func newBridge(
return nil, err
}
bridge.syncService.Run(bridge.tasks)
return bridge, nil
}

View File

@ -32,6 +32,7 @@ type Locator interface {
GetLicenseFilePath() string
GetDependencyLicensesLink() string
Clear(...string) error
ProvideIMAPSyncConfigPath() (string, error)
}
type ProxyController interface {

View File

@ -30,6 +30,7 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/internal/logging"
"github.com/ProtonMail/proton-bridge/v3/internal/safe"
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
"github.com/ProtonMail/proton-bridge/v3/internal/try"
"github.com/ProtonMail/proton-bridge/v3/internal/user"
"github.com/ProtonMail/proton-bridge/v3/internal/vault"
@ -243,6 +244,11 @@ func (bridge *Bridge) LogoutUser(ctx context.Context, userID string) error {
func (bridge *Bridge) DeleteUser(ctx context.Context, userID string) error {
logrus.WithField("userID", userID).Info("Deleting user")
syncConfigDir, err := bridge.locator.ProvideIMAPSyncConfigPath()
if err != nil {
return fmt.Errorf("failed to get sync config path")
}
return safe.LockRet(func() error {
if !bridge.vault.HasUser(userID) {
return ErrNoSuchUser
@ -252,6 +258,10 @@ func (bridge *Bridge) DeleteUser(ctx context.Context, userID string) error {
bridge.logoutUser(ctx, user, true, true, !bridge.GetTelemetryDisabled())
}
if err := imapservice.DeleteSyncState(syncConfigDir, userID); err != nil {
return fmt.Errorf("failed to delete use sync config")
}
if err := bridge.vault.DeleteUser(userID); err != nil {
logrus.WithError(err).Error("Failed to delete vault user")
}
@ -510,6 +520,11 @@ func (bridge *Bridge) addUserWithVault(
return fmt.Errorf("failed to get Statistics directory: %w", err)
}
syncSettingsPath, err := bridge.locator.ProvideIMAPSyncConfigPath()
if err != nil {
return fmt.Errorf("failed to get IMAP sync config path: %w", err)
}
user, err := user.New(
ctx,
vault,
@ -524,6 +539,8 @@ func (bridge *Bridge) addUserWithVault(
bridge.serverManager,
bridge.serverManager,
&bridgeEventSubscription{b: bridge},
bridge.syncService,
syncSettingsPath,
)
if err != nil {
return fmt.Errorf("failed to create user: %w", err)

View File

@ -198,6 +198,14 @@ func (l *Locations) ProvideStatsPath() (string, error) {
return l.getStatsPath(), nil
}
func (l *Locations) ProvideIMAPSyncConfigPath() (string, error) {
if err := os.MkdirAll(l.getIMAPSyncConfigPath(), 0o700); err != nil {
return "", err
}
return l.getIMAPSyncConfigPath(), nil
}
func (l *Locations) getGluonCachePath() string {
return filepath.Join(l.userData, "gluon")
}
@ -214,6 +222,10 @@ func (l *Locations) getSettingsPath() string {
return l.userConfig
}
func (l *Locations) getIMAPSyncConfigPath() string {
return filepath.Join(l.userConfig, "imap-sync")
}
func (l *Locations) getLogsPath() string {
return filepath.Join(l.userData, "logs")
}

View File

@ -35,6 +35,7 @@ type APIClient interface {
UnlabelMessages(ctx context.Context, messageIDs []string, labelID string) error
GetLabels(ctx context.Context, labelTypes ...proton.LabelType) ([]proton.Label, error)
GetGroupedMessageCount(ctx context.Context) ([]proton.MessageGroupCount, error)
GetMessage(ctx context.Context, messageID string) (proton.Message, error)
GetMessageMetadataPage(ctx context.Context, page, pageSize int, filter proton.MessageFilter) ([]proton.MessageMetadata, error)
GetAllMessageIDs(ctx context.Context, afterID string) ([]string, error)

View File

@ -18,7 +18,6 @@
package imapservice
import (
"context"
"fmt"
"net/mail"
"time"
@ -181,11 +180,3 @@ func wantLabels(apiLabels map[string]proton.Label, labelIDs []string) []string {
return WantLabel(apiLabel)
})
}
// sleepCtx sleeps for the given duration, or until the context is canceled.
func sleepCtx(ctx context.Context, d time.Duration) {
select {
case <-ctx.Done():
case <-time.After(d):
}
}

View File

@ -21,6 +21,7 @@ import (
"context"
"github.com/ProtonMail/gluon/connector"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
)
type IMAPServerManager interface {
@ -29,7 +30,7 @@ type IMAPServerManager interface {
connector connector.Connector,
addrID string,
idProvider GluonIDProvider,
syncStateProvider SyncStateProvider,
syncStateProvider syncservice.StateProvider,
) error
RemoveIMAPUser(ctx context.Context, deleteData bool, provider GluonIDProvider, addrID ...string) error
@ -42,7 +43,7 @@ func (n NullIMAPServerManager) AddIMAPUser(
_ connector.Connector,
_ string,
_ GluonIDProvider,
_ SyncStateProvider,
_ syncservice.StateProvider,
) error {
return nil
}

View File

@ -19,7 +19,10 @@ package imapservice
import (
"context"
"errors"
"fmt"
"path/filepath"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/reporter"
@ -28,12 +31,13 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks"
"github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
"github.com/ProtonMail/proton-bridge/v3/internal/services/userevents"
"github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity"
"github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
"github.com/ProtonMail/proton-bridge/v3/internal/vault"
"github.com/ProtonMail/proton-bridge/v3/pkg/cpc"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
)
type EventProvider interface {
@ -55,16 +59,6 @@ type GluonIDProvider interface {
GluonKey() []byte
}
type SyncStateProvider interface {
AddFailedMessageID(messageID string) error
RemFailedMessageID(messageID string) error
GetSyncStatus() vault.SyncStatus
ClearSyncStatus() error
SetHasLabels(bool) error
SetHasMessages(bool) error
SetLastMessageID(messageID string) error
}
type Service struct {
log *logrus.Entry
cpc *cpc.CPC
@ -76,11 +70,10 @@ type Service struct {
subscription *userevents.EventChanneledSubscriber
gluonIDProvider GluonIDProvider
syncStateProvider SyncStateProvider
eventProvider EventProvider
serverManager IMAPServerManager
eventPublisher events.EventPublisher
gluonIDProvider GluonIDProvider
eventProvider EventProvider
serverManager IMAPServerManager
eventPublisher events.EventPublisher
telemetry Telemetry
panicHandler async.PanicHandler
@ -92,14 +85,20 @@ type Service struct {
connectors map[string]*Connector
maxSyncMemory uint64
showAllMail bool
syncHandler *syncHandler
syncHandler *syncservice.Handler
syncUpdateApplier *SyncUpdateApplier
syncMessageBuilder *SyncMessageBuilder
syncStateProvider *SyncState
syncReporter *syncReporter
syncConfigPath string
}
func NewService(
client APIClient,
identityState *useridentity.State,
gluonIDProvider GluonIDProvider,
syncStateProvider SyncStateProvider,
eventProvider EventProvider,
serverManager IMAPServerManager,
eventPublisher events.EventPublisher,
@ -111,27 +110,34 @@ func NewService(
reporter reporter.Reporter,
addressMode usertypes.AddressMode,
subscription events.Subscription,
syncConfigDir string,
maxSyncMemory uint64,
showAllMail bool,
) *Service {
subscriberName := fmt.Sprintf("imap-%v", identityState.User.ID)
log := logrus.WithFields(logrus.Fields{
"user": identityState.User.ID,
"service": "imap",
})
rwIdentity := newRWIdentity(identityState, bridgePassProvider, keyPassProvider)
syncUpdateApplier := NewSyncUpdateApplier()
syncMessageBuilder := NewSyncMessageBuilder(rwIdentity)
syncReporter := newSyncReporter(identityState.User.ID, eventPublisher, time.Second)
return &Service{
cpc: cpc.NewCPC(),
log: logrus.WithFields(logrus.Fields{
"user": identityState.User.ID,
"service": "imap",
}),
cpc: cpc.NewCPC(),
client: client,
identityState: newRWIdentity(identityState, bridgePassProvider, keyPassProvider),
log: log,
identityState: rwIdentity,
labels: newRWLabels(),
addressMode: addressMode,
gluonIDProvider: gluonIDProvider,
serverManager: serverManager,
syncStateProvider: syncStateProvider,
eventProvider: eventProvider,
eventPublisher: eventPublisher,
gluonIDProvider: gluonIDProvider,
serverManager: serverManager,
eventProvider: eventProvider,
eventPublisher: eventPublisher,
subscription: userevents.NewEventSubscriber(subscriberName),
@ -146,10 +152,31 @@ func NewService(
eventWatcher: subscription.Add(events.IMAPServerCreated{}),
eventSubscription: subscription,
showAllMail: showAllMail,
syncUpdateApplier: syncUpdateApplier,
syncMessageBuilder: syncMessageBuilder,
syncReporter: syncReporter,
syncConfigPath: getSyncConfigPath(syncConfigDir, identityState.User.ID),
}
}
func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) error {
func (s *Service) Start(
ctx context.Context,
group *orderedtasks.OrderedCancelGroup,
syncRegulator syncservice.Regulator,
) error {
{
syncStateProvider, err := NewSyncState(s.syncConfigPath)
if err != nil {
return fmt.Errorf("failed to load sync state: %w", err)
}
s.syncStateProvider = syncStateProvider
}
s.syncHandler = syncservice.NewHandler(syncRegulator, s.client, s.identityState.UserID(), s.syncStateProvider, s.log, s.panicHandler)
// Get user labels
apiLabels, err := s.client.GetLabels(ctx, proton.LabelTypeSystem, proton.LabelTypeFolder, proton.LabelTypeLabel)
if err != nil {
@ -227,6 +254,10 @@ func (s *Service) GetLabels(ctx context.Context) (map[string]proton.Label, error
return cpc.SendTyped[map[string]proton.Label](ctx, s.cpc, &getLabelsReq{})
}
func (s *Service) GetSyncFailedMessageIDs(ctx context.Context) ([]string, error) {
return cpc.SendTyped[[]string](ctx, s.cpc, &getSyncFailedMessagesReq{})
}
func (s *Service) Close() {
for _, c := range s.connectors {
c.StateClose()
@ -251,7 +282,7 @@ func (s *Service) HandleRefreshEvent(ctx context.Context, _ proton.RefreshFlag)
return err
}
if err := s.syncStateProvider.ClearSyncStatus(); err != nil {
if err := s.syncStateProvider.ClearSyncStatus(ctx); err != nil {
return fmt.Errorf("failed to clear sync status:%w", err)
}
@ -259,7 +290,7 @@ func (s *Service) HandleRefreshEvent(ctx context.Context, _ proton.RefreshFlag)
return err
}
s.syncHandler.launch(s)
s.startSyncing()
return nil
}
@ -279,11 +310,9 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
defer s.cpc.Close()
defer s.eventSubscription.Remove(s.eventWatcher)
s.syncHandler = newSyncHandler(ctx, s.panicHandler)
defer s.syncHandler.Close()
s.syncHandler.launch(s)
s.startSyncing()
eventHandler := userevents.EventHandler{
UserHandler: s,
@ -307,7 +336,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
}
switch r := req.Value().(type) {
case *setAddressModeReq:
err := s.setAddressMode(ctx, s.syncHandler, r.mode)
err := s.setAddressMode(ctx, r.mode)
req.Reply(ctx, nil, err)
case *resyncReq:
@ -325,7 +354,7 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
s.log.Info("Resuming sync")
// Cancel previous run, if any, just in case.
s.syncHandler.CancelAndWait()
s.syncHandler.launch(s)
s.startSyncing()
req.Reply(ctx, nil, nil)
case *getLabelsReq:
labels := s.labels.GetLabelMap()
@ -347,6 +376,15 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
req.Reply(ctx, nil, nil)
s.setShowAllMail(r.v)
case *getSyncFailedMessagesReq:
status, err := s.syncStateProvider.GetSyncStatus(ctx)
if err != nil {
req.Reply(ctx, nil, fmt.Errorf("failed to get sync status: %w", err))
continue
}
req.Reply(ctx, maps.Keys(status.FailedMessages), nil)
default:
s.log.Error("Received unknown request")
}
@ -366,11 +404,19 @@ func (s *Service) run(ctx context.Context) { //nolint gocyclo
s.eventProvider.ResumeIMAP()
}
case update, ok := <-s.syncHandler.updater.ch:
case request, ok := <-s.syncUpdateApplier.requestCh:
if !ok {
continue
}
s.onSyncUpdate(ctx, update)
updates, err := request(ctx, s.addressMode, s.connectors)
if err := s.syncUpdateApplier.reply(ctx, updates, err); err != nil {
if !errors.Is(err, context.Canceled) {
s.log.WithError(err).Error("unexpected error during sync update reply")
}
return
}
case e, ok := <-s.subscription.OnEventCh():
if !ok {
@ -449,17 +495,6 @@ func (s *Service) rebuildConnectors() error {
return nil
}
func (s *Service) onSyncUpdate(ctx context.Context, syncUpdate syncUpdate) {
c, ok := s.connectors[syncUpdate.addrID]
if !ok {
s.log.Warningf("Received syncUpdate for unknown addr (%v), connector may have been removed", syncUpdate.addrID)
syncUpdate.update.Done(fmt.Errorf("undeliverable"))
return
}
c.publishUpdate(ctx, syncUpdate.update)
}
func (s *Service) addConnectorsToServer(ctx context.Context, connectors map[string]*Connector) error {
addedConnectors := make([]string, 0, len(connectors))
for _, c := range connectors {
@ -490,7 +525,7 @@ func (s *Service) removeConnectorsFromServer(ctx context.Context, connectors map
return nil
}
func (s *Service) setAddressMode(ctx context.Context, handler *syncHandler, mode usertypes.AddressMode) error {
func (s *Service) setAddressMode(ctx context.Context, mode usertypes.AddressMode) error {
if s.addressMode == mode {
return nil
}
@ -502,13 +537,13 @@ func (s *Service) setAddressMode(ctx context.Context, handler *syncHandler, mode
s.log.Info("Setting Combined Address Mode")
}
handler.CancelAndWait()
s.syncHandler.CancelAndWait()
if err := s.removeConnectorsFromServer(ctx, s.connectors, true); err != nil {
return err
}
if err := s.syncStateProvider.ClearSyncStatus(); err != nil {
if err := s.syncStateProvider.ClearSyncStatus(ctx); err != nil {
return fmt.Errorf("failed to clear sync status:%w", err)
}
@ -520,7 +555,7 @@ func (s *Service) setAddressMode(ctx context.Context, handler *syncHandler, mode
return err
}
handler.launch(s)
s.startSyncing()
return nil
}
@ -537,6 +572,10 @@ func (s *Service) setShowAllMail(v bool) {
}
}
func (s *Service) startSyncing() {
s.syncHandler.Execute(s.syncReporter, s.labels.GetLabelMap(), s.syncUpdateApplier, s.syncMessageBuilder, syncservice.DefaultRetryCoolDown)
}
type resyncReq struct{}
type cancelSyncReq struct{}
@ -556,3 +595,9 @@ type showAllMailReq struct{ v bool }
type setAddressModeReq struct {
mode usertypes.AddressMode
}
type getSyncFailedMessagesReq struct{}
func getSyncConfigPath(path string, userID string) string {
return filepath.Join(path, fmt.Sprintf("sync-%v", userID))
}

View File

@ -136,7 +136,12 @@ func addNewAddressSplitMode(ctx context.Context, s *Service, addrID string) erro
s.connectors[connector.addrID] = connector
if err := syncLabels(ctx, s.labels.GetLabelMap(), connector); err != nil {
updates, err := syncLabels(ctx, s.labels.GetLabelMap(), []*Connector{connector})
if err != nil {
return fmt.Errorf("failed to create labels updates for new address: %w", err)
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return fmt.Errorf("failed to sync labels for new address: %w", err)
}

View File

@ -148,7 +148,7 @@ func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMet
if res.err != nil {
s.log.WithError(err).Error("Failed to build RFC822 message")
if err := s.syncStateProvider.AddFailedMessageID(message.ID); err != nil {
if err := s.syncStateProvider.AddFailedMessageID(ctx, message.ID); err != nil {
s.log.WithError(err).Error("Failed to add failed message ID to vault")
}
@ -157,7 +157,7 @@ func onMessageCreated(ctx context.Context, s *Service, message proton.MessageMet
return nil
}
if err := s.syncStateProvider.RemFailedMessageID(message.ID); err != nil {
if err := s.syncStateProvider.RemFailedMessageID(ctx, message.ID); err != nil {
s.log.WithError(err).Error("Failed to remove failed message ID from vault")
}
@ -211,7 +211,7 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me
if res.err != nil {
logrus.WithError(err).Error("Failed to build RFC822 message")
if err := s.syncStateProvider.AddFailedMessageID(event.ID); err != nil {
if err := s.syncStateProvider.AddFailedMessageID(ctx, event.ID); err != nil {
s.log.WithError(err).Error("Failed to add failed message ID to vault")
}
@ -220,7 +220,7 @@ func onMessageUpdateDraftOrSent(ctx context.Context, s *Service, event proton.Me
return nil
}
if err := s.syncStateProvider.RemFailedMessageID(event.ID); err != nil {
if err := s.syncStateProvider.RemFailedMessageID(ctx, event.ID); err != nil {
s.log.WithError(err).Error("Failed to remove failed message ID from vault")
}

View File

@ -1,127 +0,0 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"context"
"fmt"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/imap"
)
type syncUpdate struct {
addrID string
update imap.Update
}
type syncUpdater struct {
ch chan syncUpdate
}
type syncUpdatePublisher struct {
addrID string
updater *syncUpdater
}
func (s *syncUpdatePublisher) publishUpdate(ctx context.Context, update imap.Update) {
select {
case <-ctx.Done():
update.Done(fmt.Errorf("not applied: %w", ctx.Err()))
return
case s.updater.ch <- syncUpdate{addrID: s.addrID, update: update}:
}
}
func newSyncUpdater() *syncUpdater {
return &syncUpdater{ch: make(chan syncUpdate)}
}
func (s *syncUpdater) createPublisher(addrID string) *syncUpdatePublisher {
return &syncUpdatePublisher{updater: s, addrID: addrID}
}
func (s *syncUpdater) Close() {
close(s.ch)
}
type syncHandler struct {
group *async.Group
updater *syncUpdater
syncFinishedCh chan error
}
func newSyncHandler(ctx context.Context, handler async.PanicHandler) *syncHandler {
return &syncHandler{
group: async.NewGroup(ctx, handler),
updater: newSyncUpdater(),
syncFinishedCh: make(chan error, 2),
}
}
func (s *syncHandler) Close() {
s.group.CancelAndWait()
close(s.syncFinishedCh)
}
func (s *syncHandler) CancelAndWait() {
s.group.CancelAndWait()
}
func (s *syncHandler) Cancel() {
s.group.Cancel()
}
func (s *syncHandler) OnSyncFinishedCH() <-chan error {
return s.syncFinishedCh
}
func (s *syncHandler) launch(service *Service) {
service.eventProvider.PauseIMAP()
labels := service.labels.GetLabelMap()
updaters := make(map[string]updatePublisher, len(service.connectors))
for _, c := range service.connectors {
updaters[c.addrID] = s.updater.createPublisher(c.addrID)
}
state := &syncJob{
client: service.client,
userID: service.identityState.UserID(),
labels: labels,
updaters: updaters,
addressMode: service.addressMode,
syncState: service.syncStateProvider,
eventPublisher: service.eventPublisher,
log: service.log,
// We make a copy of the identity state to avoid holding on to locks for a very long time.
identityState: service.identityState.Clone(),
panicHandler: service.panicHandler,
reporter: service.reporter,
maxSyncMemory: service.maxSyncMemory,
keyProvider: service.identityState.keyPassProvider,
}
s.group.Once(func(ctx context.Context) {
err := state.run(ctx)
s.syncFinishedCh <- err
})
}

View File

@ -95,6 +95,13 @@ func (r *rwIdentity) WithAddrKR(addrID string, fn func(userKR *crypto.KeyRing, a
return r.identity.WithAddrKR(addrID, r.keyPassProvider.KeyPass(), fn)
}
func (r *rwIdentity) WithAddrKRs(fn func(*crypto.KeyRing, map[string]*crypto.KeyRing) error) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.identity.WithAddrKRs(r.keyPassProvider.KeyPass(), fn)
}
func (r *rwIdentity) CheckAuth(email string, password []byte, telemetry Telemetry) (string, error) {
r.lock.RLock()
defer r.lock.RUnlock()

View File

@ -1,203 +0,0 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"context"
"fmt"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/reporter"
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity"
"github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
"github.com/bradenaw/juniper/xslices"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
type updatePublisher interface {
publishUpdate(ctx context.Context, update imap.Update)
}
type syncJob struct {
client APIClient
userID string
labels labelMap
updaters map[string]updatePublisher
addressMode usertypes.AddressMode
syncState SyncStateProvider
eventPublisher events.EventPublisher
log *logrus.Entry
identityState *useridentity.State
panicHandler async.PanicHandler
reporter reporter.Reporter
keyProvider useridentity.KeyPassProvider
maxSyncMemory uint64
}
const SyncRetryCoolDown = 20 * time.Second
func (s *syncJob) run(ctx context.Context) error {
s.log.Info("Sync triggered")
s.eventPublisher.PublishEvent(ctx, events.SyncStarted{UserID: s.userID})
if s.syncState.GetSyncStatus().IsComplete() {
s.log.Info("Sync already complete, only system labels will be updated")
if err := s.syncSystemLabels(ctx); err != nil {
s.log.WithError(err).Error("Failed to sync system labels")
s.eventPublisher.PublishEvent(ctx, events.SyncFailed{
UserID: s.userID,
Error: err,
})
return err
}
s.eventPublisher.PublishEvent(ctx, events.SyncFinished{UserID: s.userID})
return nil
}
for {
if err := ctx.Err(); err != nil {
s.log.WithError(err).Error("Sync aborted")
return fmt.Errorf("sync aborted: %w", ctx.Err())
} else if err := s.doSync(ctx); err != nil {
s.log.WithError(err).Error("Failed to sync, will retry later")
sleepCtx(ctx, SyncRetryCoolDown)
} else {
break
}
}
return nil
}
func (s *syncJob) syncSystemLabels(ctx context.Context) error {
var updates []imap.Update
for _, label := range s.labels {
if !WantLabel(label) {
continue
}
for _, connector := range s.updaters {
update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name)
connector.publishUpdate(ctx, update)
updates = append(updates, update)
}
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return fmt.Errorf("could not sync system labels: %w", err)
}
return nil
}
func (s *syncJob) doSync(ctx context.Context) error {
start := time.Now()
s.log.WithField("start", start).Info("Beginning user sync")
s.eventPublisher.PublishEvent(ctx, events.SyncStarted{
UserID: s.userID,
})
if err := s.sync(ctx); err != nil {
s.log.WithError(err).Warn("Failed to sync user")
s.eventPublisher.PublishEvent(ctx, events.SyncFailed{
UserID: s.userID,
Error: err,
})
return fmt.Errorf("failed to sync: %w", err)
}
s.log.WithField("duration", time.Since(start)).Info("Finished user sync")
s.eventPublisher.PublishEvent(ctx, events.SyncFinished{
UserID: s.userID,
})
return nil
}
func (s *syncJob) sync(ctx context.Context) error {
syncStatus := s.syncState.GetSyncStatus()
if !syncStatus.HasLabels {
s.log.Info("Syncing labels")
if err := syncLabels(ctx, s.labels, maps.Values(s.updaters)...); err != nil {
return fmt.Errorf("failed to sync labels: %w", err)
}
if err := s.syncState.SetHasLabels(true); err != nil {
return fmt.Errorf("failed to set has labels: %w", err)
}
s.log.Info("Synced labels")
}
if !syncStatus.HasMessages {
s.log.Info("Syncing messages")
// Determine which messages to sync.
messageIDs, err := s.client.GetAllMessageIDs(ctx, "")
if err != nil {
return fmt.Errorf("failed to get message IDs to sync: %w", err)
}
s.log.Debugf("User has the following failed synced message ids: %v", syncStatus.FailedMessageIDs)
// Remove any messages that have already failed to sync.
messageIDs = xslices.Filter(messageIDs, func(messageID string) bool {
return !slices.Contains(syncStatus.FailedMessageIDs, messageID)
})
// Reverse the order of the message IDs so that the newest messages are synced first.
xslices.Reverse(messageIDs)
// If we have a message ID that we've already synced, then we can skip all messages before it.
if idx := xslices.Index(messageIDs, syncStatus.LastMessageID); idx >= 0 {
messageIDs = messageIDs[idx+1:]
}
// Sync the messages.
if err := s.syncMessages(
ctx,
messageIDs,
); err != nil {
return fmt.Errorf("failed to sync messages: %w", err)
}
if err := s.syncState.SetHasMessages(true); err != nil {
return fmt.Errorf("failed to set has messages: %w", err)
}
s.log.Info("Synced messages")
} else {
s.log.Info("Messages are already synced, skipping")
}
return nil
}

View File

@ -1,115 +0,0 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"bytes"
"context"
"fmt"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/logging"
"github.com/ProtonMail/go-proton-api"
)
type attachmentResult struct {
attachment []byte
err error
}
type attachmentJob struct {
id string
size int64
result chan attachmentResult
}
type attachmentDownloader struct {
workerCh chan attachmentJob
cancel context.CancelFunc
}
func attachmentWorker(ctx context.Context, client APIClient, work <-chan attachmentJob) {
for {
select {
case <-ctx.Done():
return
case job, ok := <-work:
if !ok {
return
}
var b bytes.Buffer
b.Grow(int(job.size))
err := client.GetAttachmentInto(ctx, job.id, &b)
select {
case <-ctx.Done():
close(job.result)
return
case job.result <- attachmentResult{attachment: b.Bytes(), err: err}:
close(job.result)
}
}
}
}
func (s *syncJob) newAttachmentDownloader(ctx context.Context, client APIClient, workerCount int) *attachmentDownloader {
workerCh := make(chan attachmentJob, (workerCount+2)*workerCount)
ctx, cancel := context.WithCancel(ctx)
for i := 0; i < workerCount; i++ {
workerCh = make(chan attachmentJob)
async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) { attachmentWorker(ctx, client, workerCh) }, logging.Labels{
"sync": fmt.Sprintf("att-downloader %v", i),
})
}
return &attachmentDownloader{
workerCh: workerCh,
cancel: cancel,
}
}
func (a *attachmentDownloader) getAttachments(ctx context.Context, attachments []proton.Attachment) ([][]byte, error) {
resultChs := make([]chan attachmentResult, len(attachments))
for i, id := range attachments {
resultChs[i] = make(chan attachmentResult, 1)
select {
case a.workerCh <- attachmentJob{id: id.ID, result: resultChs[i], size: id.Size}:
case <-ctx.Done():
return nil, ctx.Err()
}
}
result := make([][]byte, len(attachments))
var err error
for i := 0; i < len(attachments); i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
case r := <-resultChs[i]:
if r.err != nil {
err = fmt.Errorf("failed to get attachment %v: %w", attachments[i], r.err)
}
result[i] = r.attachment
}
}
return result, err
}
func (a *attachmentDownloader) close() {
a.cancel()
}

View File

@ -24,8 +24,6 @@ import (
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/rfc822"
"github.com/ProtonMail/go-proton-api"
"github.com/bradenaw/juniper/xslices"
"github.com/stretchr/testify/require"
)
@ -49,32 +47,3 @@ func TestNewFailedMessageLiteral(t *testing.T) {
require.Equal(t, `("text" "plain" () NIL NIL "base64" 114 2)`, parsed.Body)
require.Equal(t, `("text" "plain" () NIL NIL "base64" 114 2 NIL NIL NIL NIL)`, parsed.Structure)
}
func TestSyncChunkSyncBuilderBatch(t *testing.T) {
// GODT-2424 - Some messages were not fully built due to a bug in the chunking if the total memory used by the
// message would be higher than the maximum we allowed.
const totalMessageCount = 100
msg := proton.FullMessage{
Message: proton.Message{
Attachments: []proton.Attachment{
{
Size: int64(8 * Megabyte),
},
},
},
AttData: nil,
}
messages := xslices.Repeat(msg, totalMessageCount)
chunks := chunkSyncBuilderBatch(messages, 16*Megabyte)
var totalMessagesInChunks int
for _, v := range chunks {
totalMessagesInChunks += len(v)
}
require.Equal(t, totalMessagesInChunks, totalMessageCount)
}

View File

@ -1,76 +0,0 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"context"
"fmt"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/go-proton-api"
)
// nolint:exhaustive
func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updatePublishers ...updatePublisher) error {
var updates []imap.Update
// Create placeholder Folders/Labels mailboxes with the \Noselect attribute.
for _, prefix := range []string{folderPrefix, labelPrefix} {
for _, updateCh := range updatePublishers {
update := newPlaceHolderMailboxCreatedUpdate(prefix)
updateCh.publishUpdate(ctx, update)
updates = append(updates, update)
}
}
// Sync the user's labels.
for labelID, label := range apiLabels {
if !WantLabel(label) {
continue
}
switch label.Type {
case proton.LabelTypeSystem:
for _, updateCh := range updatePublishers {
update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name)
updateCh.publishUpdate(ctx, update)
updates = append(updates, update)
}
case proton.LabelTypeFolder, proton.LabelTypeLabel:
for _, updateCh := range updatePublishers {
update := newMailboxCreatedUpdate(imap.MailboxID(labelID), GetMailboxName(label))
updateCh.publishUpdate(ctx, update)
updates = append(updates, update)
}
default:
return fmt.Errorf("unknown label type: %d", label.Type)
}
}
// Wait for all label updates to be applied.
for _, update := range updates {
err, ok := update.WaitContext(ctx)
if ok && err != nil {
return fmt.Errorf("failed to apply label create update in gluon %v: %w", update.String(), err)
}
}
return nil
}

View File

@ -0,0 +1,63 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"bytes"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
"github.com/ProtonMail/proton-bridge/v3/pkg/message"
)
type SyncMessageBuilder struct {
state *rwIdentity
}
func NewSyncMessageBuilder(rw *rwIdentity) *SyncMessageBuilder {
return &SyncMessageBuilder{state: rw}
}
func (s SyncMessageBuilder) WithKeys(f func(*crypto.KeyRing, map[string]*crypto.KeyRing) error) error {
return s.state.WithAddrKRs(f)
}
func (s SyncMessageBuilder) BuildMessage(
apiLabels map[string]proton.Label,
full proton.FullMessage,
addrKR *crypto.KeyRing,
buffer *bytes.Buffer,
) (syncservice.BuildResult, error) {
buffer.Grow(full.Size)
if err := message.BuildRFC822Into(addrKR, full.Message, full.AttData, defaultMessageJobOpts(), buffer); err != nil {
return syncservice.BuildResult{}, err
}
update, err := newMessageCreatedUpdate(apiLabels, full.MessageMetadata, buffer.Bytes())
if err != nil {
return syncservice.BuildResult{}, err
}
return syncservice.BuildResult{
AddressID: full.Message.AddressID,
MessageID: full.Message.ID,
Update: update,
}, nil
}

View File

@ -1,523 +0,0 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"bytes"
"context"
"fmt"
"os"
"runtime"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/logging"
"github.com/ProtonMail/gluon/reporter"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/bradenaw/juniper/parallel"
"github.com/bradenaw/juniper/xslices"
"github.com/pbnjay/memory"
"github.com/sirupsen/logrus"
)
func (s *syncJob) syncMessages(ctx context.Context, messageIDs []string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Track the amount of time to process all the messages.
syncStartTime := time.Now()
defer func() { logrus.WithField("duration", time.Since(syncStartTime)).Info("Message sync completed") }()
s.log.WithFields(logrus.Fields{
"messages": len(messageIDs),
"numCPU": runtime.NumCPU(),
}).Info("Starting message sync")
// Create the flushers, one per update channel.
// Create a reporter to report sync progress updates.
syncReporter := newSyncReporter(s.userID, s.eventPublisher, len(messageIDs), time.Second)
defer syncReporter.done(ctx)
// Expected mem usage for this whole process should be the sum of MaxMessageBuildingMem and MaxDownloadRequestMem
// times x due to pipeline and all additional memory used by network requests and compression+io.
totalMemory := memory.TotalMemory()
syncLimits := newSyncLimits(s.maxSyncMemory)
if syncLimits.MaxSyncMemory >= totalMemory/2 {
logrus.Warnf("Requested max sync memory of %v MB is greater than half of system memory (%v MB), forcing to half of system memory",
toMB(syncLimits.MaxSyncMemory), toMB(totalMemory/2))
syncLimits.MaxSyncMemory = totalMemory / 2
}
if syncLimits.MaxSyncMemory < 800*Megabyte {
logrus.Warnf("Requested max sync memory of %v MB, but minimum recommended is 800 MB, forcing max syncMemory to 800MB", toMB(syncLimits.MaxSyncMemory))
syncLimits.MaxSyncMemory = 800 * Megabyte
}
logrus.Debugf("Total System Memory: %v", toMB(totalMemory))
// Linter says it's not used. This is a lie.
var syncMaxDownloadRequestMem uint64
// Linter says it's not used. This is a lie.
var syncMaxMessageBuildingMem uint64
// If less than 2GB available try and limit max memory to 512 MB
switch {
case syncLimits.MaxSyncMemory < 2*Gigabyte:
if syncLimits.MaxSyncMemory < 800*Megabyte {
logrus.Warnf("System has less than 800MB of memory, you may experience issues sycing large mailboxes")
}
syncMaxDownloadRequestMem = syncLimits.MinDownloadRequestMem
syncMaxMessageBuildingMem = syncLimits.MinMessageBuildingMem
case syncLimits.MaxSyncMemory == 2*Gigabyte:
// Increasing the max download capacity has very little effect on sync speed. We could increase the download
// memory but the user would see less sync notifications. A smaller value here leads to more frequent
// updates. Additionally, most of sync time is spent in the message building.
syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem
// Currently limited so that if a user has multiple accounts active it also doesn't cause excessive memory usage.
syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem
default:
// Divide by 8 as download stage and build stage will use aprox. 4x the specified memory.
remainingMemory := (syncLimits.MaxSyncMemory - 2*Gigabyte) / 8
syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem + remainingMemory
syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem + remainingMemory
}
logrus.Debugf("Max memory usage for sync Download=%vMB Building=%vMB Predicted Max Total=%vMB",
toMB(syncMaxDownloadRequestMem),
toMB(syncMaxMessageBuildingMem),
toMB((syncMaxMessageBuildingMem*4)+(syncMaxDownloadRequestMem*4)),
)
downloadCh := startMetadataDownloader(ctx, s, messageIDs, syncMaxDownloadRequestMem)
buildCh, errorCh := startMessageDownloader(ctx, s, syncLimits, downloadCh)
flushCh := startMessageBuilder(ctx, s, buildCh, syncMaxMessageBuildingMem)
flushUpdateCh := startMessageFlusher(ctx, s, flushCh)
for flushUpdate := range flushUpdateCh {
if flushUpdate.err != nil {
return flushUpdate.err
}
if err := s.syncState.SetLastMessageID(flushUpdate.messageID); err != nil {
return fmt.Errorf("failed to set last synced message ID: %w", err)
}
syncReporter.add(ctx, flushUpdate.batchLen)
}
return <-errorCh
}
const Kilobyte = uint64(1024)
const Megabyte = 1024 * Kilobyte
const Gigabyte = 1024 * Megabyte
func toMB(v uint64) float64 {
return float64(v) / float64(Megabyte)
}
type syncLimits struct {
MaxDownloadRequestMem uint64
MinDownloadRequestMem uint64
MaxMessageBuildingMem uint64
MinMessageBuildingMem uint64
MaxSyncMemory uint64
MaxParallelDownloads int
}
func newSyncLimits(maxSyncMemory uint64) syncLimits {
limits := syncLimits{
// There's no point in using more than 128MB of download data per stage, after that we reach a point of diminishing
// returns as we can't keep the pipeline fed fast enough.
MaxDownloadRequestMem: 128 * Megabyte,
// Any lower than this and we may fail to download messages.
MinDownloadRequestMem: 40 * Megabyte,
// This value can be increased to your hearts content. The more system memory the user has, the more messages
// we can build in parallel.
MaxMessageBuildingMem: 128 * Megabyte,
MinMessageBuildingMem: 64 * Megabyte,
// Maximum recommend value for parallel downloads by the API team.
MaxParallelDownloads: 20,
MaxSyncMemory: maxSyncMemory,
}
if _, ok := os.LookupEnv("BRIDGE_SYNC_FORCE_MINIMUM_SPEC"); ok {
logrus.Warn("Sync specs forced to minimum")
limits.MaxDownloadRequestMem = 50 * Megabyte
limits.MaxMessageBuildingMem = 80 * Megabyte
limits.MaxParallelDownloads = 2
limits.MaxSyncMemory = 800 * Megabyte
}
return limits
}
func chunkSyncBuilderBatch(batch []proton.FullMessage, maxMemory uint64) [][]proton.FullMessage {
var expectedMemUsage uint64
var chunks [][]proton.FullMessage
var lastIndex int
var index int
for _, v := range batch {
var dataSize uint64
for _, a := range v.Attachments {
dataSize += uint64(a.Size)
}
// 2x increase for attachment due to extra memory needed for decrypting and writing
// in memory buffer.
dataSize *= 2
dataSize += uint64(len(v.Body))
nextMemSize := expectedMemUsage + dataSize
if nextMemSize >= maxMemory {
chunks = append(chunks, batch[lastIndex:index])
lastIndex = index
expectedMemUsage = dataSize
} else {
expectedMemUsage = nextMemSize
}
index++
}
if lastIndex < len(batch) {
chunks = append(chunks, batch[lastIndex:])
}
return chunks
}
type flushUpdate struct {
messageID string
err error
batchLen int
}
type downloadRequest struct {
ids []string
expectedSize uint64
err error
}
type downloadedMessageBatch struct {
batch []proton.FullMessage
}
type builtMessageBatch struct {
batch []*buildRes
}
func startMetadataDownloader(ctx context.Context, s *syncJob, messageIDs []string, syncMaxDownloadRequestMem uint64) <-chan downloadRequest {
downloadCh := make(chan downloadRequest)
// Go routine in charge of downloading message metadata
async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) {
defer close(downloadCh)
const MetadataDataPageSize = 150
var downloadReq downloadRequest
downloadReq.ids = make([]string, 0, MetadataDataPageSize)
metadataChunks := xslices.Chunk(messageIDs, MetadataDataPageSize)
for i, metadataChunk := range metadataChunks {
logrus.Debugf("Metadata Request (%v of %v), previous: %v", i, len(metadataChunks), len(downloadReq.ids))
metadata, err := s.client.GetMessageMetadataPage(ctx, 0, len(metadataChunk), proton.MessageFilter{ID: metadataChunk})
if err != nil {
logrus.WithError(err).Errorf("Failed to download message metadata for chunk %v", i)
downloadReq.err = err
select {
case downloadCh <- downloadReq:
case <-ctx.Done():
return
}
return
}
if ctx.Err() != nil {
return
}
// Build look up table so that messages are processed in the same order.
metadataMap := make(map[string]int, len(metadata))
for i, v := range metadata {
metadataMap[v.ID] = i
}
for i, id := range metadataChunk {
m := &metadata[metadataMap[id]]
nextSize := downloadReq.expectedSize + uint64(m.Size)
if nextSize >= syncMaxDownloadRequestMem || len(downloadReq.ids) >= 256 {
logrus.Debugf("Download Request Sent at %v of %v", i, len(metadata))
select {
case downloadCh <- downloadReq:
case <-ctx.Done():
return
}
downloadReq.expectedSize = 0
downloadReq.ids = make([]string, 0, MetadataDataPageSize)
nextSize = uint64(m.Size)
}
downloadReq.ids = append(downloadReq.ids, id)
downloadReq.expectedSize = nextSize
}
}
if len(downloadReq.ids) != 0 {
logrus.Debugf("Sending remaining download request")
select {
case downloadCh <- downloadReq:
case <-ctx.Done():
return
}
}
}, logging.Labels{"sync-stage": "meta-data"})
return downloadCh
}
func startMessageDownloader(ctx context.Context, s *syncJob, syncLimits syncLimits, downloadCh <-chan downloadRequest) (<-chan downloadedMessageBatch, <-chan error) {
buildCh := make(chan downloadedMessageBatch)
errorCh := make(chan error, syncLimits.MaxParallelDownloads*4)
// Goroutine in charge of downloading and building messages in maxBatchSize batches.
async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) {
defer close(buildCh)
defer close(errorCh)
defer func() {
logrus.Debugf("sync downloader exit")
}()
attachmentDownloader := s.newAttachmentDownloader(ctx, s.client, syncLimits.MaxParallelDownloads)
defer attachmentDownloader.close()
for request := range downloadCh {
logrus.Debugf("Download request: %v MB:%v", len(request.ids), toMB(request.expectedSize))
if request.err != nil {
errorCh <- request.err
return
}
if ctx.Err() != nil {
errorCh <- ctx.Err()
return
}
result, err := parallel.MapContext(ctx, syncLimits.MaxParallelDownloads, request.ids, func(ctx context.Context, id string) (proton.FullMessage, error) {
defer async.HandlePanic(s.panicHandler)
var result proton.FullMessage
msg, err := s.client.GetMessage(ctx, id)
if err != nil {
logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message")
return proton.FullMessage{}, err
}
attachments, err := attachmentDownloader.getAttachments(ctx, msg.Attachments)
if err != nil {
logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message attachments")
return proton.FullMessage{}, err
}
result.Message = msg
result.AttData = attachments
return result, nil
})
if err != nil {
errorCh <- err
return
}
select {
case buildCh <- downloadedMessageBatch{
batch: result,
}:
case <-ctx.Done():
return
}
}
}, logging.Labels{"sync-stage": "download"})
return buildCh, errorCh
}
func startMessageBuilder(ctx context.Context, s *syncJob, buildCh <-chan downloadedMessageBatch, syncMaxMessageBuildingMem uint64) <-chan builtMessageBatch {
flushCh := make(chan builtMessageBatch)
// Goroutine which builds messages after they have been downloaded
async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) {
defer close(flushCh)
defer func() {
logrus.Debugf("sync builder exit")
}()
if err := s.identityState.WithAddrKRs(s.keyProvider.KeyPass(), func(_ *crypto.KeyRing, addrKRs map[string]*crypto.KeyRing) error {
maxMessagesInParallel := runtime.NumCPU()
for buildBatch := range buildCh {
if ctx.Err() != nil {
return ctx.Err()
}
chunks := chunkSyncBuilderBatch(buildBatch.batch, syncMaxMessageBuildingMem)
for index, chunk := range chunks {
logrus.Debugf("Build request: %v of %v count=%v", index, len(chunks), len(chunk))
result, err := parallel.MapContext(ctx, maxMessagesInParallel, chunk, func(ctx context.Context, msg proton.FullMessage) (*buildRes, error) {
defer async.HandlePanic(s.panicHandler)
kr, ok := addrKRs[msg.AddressID]
if !ok {
logrus.Errorf("Address '%v' on message '%v' does not have an unlocked kerying", msg.AddressID, msg.ID)
return &buildRes{
messageID: msg.ID,
addressID: msg.AddressID,
err: fmt.Errorf("address does not have an unlocked keyring"),
}, nil
}
res := buildRFC822(s.labels, msg, kr, new(bytes.Buffer))
if res.err != nil {
s.log.WithError(res.err).WithField("msgID", msg.ID).Error("Failed to build message (syn)")
}
return res, nil
})
if err != nil {
return err
}
select {
case flushCh <- builtMessageBatch{result}:
case <-ctx.Done():
return nil
}
}
}
return nil
}); err != nil {
s.log.WithError(err).Error("Sync message builder exited with error")
}
}, logging.Labels{"sync-stage": "builder"})
return flushCh
}
func startMessageFlusher(ctx context.Context, s *syncJob, messageBatchCH <-chan builtMessageBatch) <-chan flushUpdate {
flushUpdateCh := make(chan flushUpdate)
// Goroutine which converts the messages into updates and builds a waitable structure for progress tracking.
async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) {
defer close(flushUpdateCh)
defer func() {
logrus.Debugf("sync flush exit")
}()
type updateTargetInfo struct {
queueIndex int
ch updatePublisher
}
pendingUpdates := make([][]*imap.MessageCreated, len(s.updaters))
addressToIndex := make(map[string]updateTargetInfo)
{
i := 0
for addrID, updateCh := range s.updaters {
addressToIndex[addrID] = updateTargetInfo{
ch: updateCh,
queueIndex: i,
}
i++
}
}
for downloadBatch := range messageBatchCH {
logrus.Debugf("Flush batch: %v", len(downloadBatch.batch))
for _, res := range downloadBatch.batch {
if res.err != nil {
if err := s.syncState.AddFailedMessageID(res.messageID); err != nil {
logrus.WithError(err).Error("Failed to add failed message ID")
}
if err := s.reporter.ReportMessageWithContext("Failed to build message (sync)", reporter.Context{
"messageID": res.messageID,
"error": res.err,
}); err != nil {
s.log.WithError(err).Error("Failed to report message build error")
}
// We could sync a placeholder message here, but for now we skip it entirely.
continue
}
if err := s.syncState.RemFailedMessageID(res.messageID); err != nil {
logrus.WithError(err).Error("Failed to remove failed message ID")
}
targetInfo := addressToIndex[res.addressID]
pendingUpdates[targetInfo.queueIndex] = append(pendingUpdates[targetInfo.queueIndex], res.update)
}
for _, info := range addressToIndex {
up := imap.NewMessagesCreated(true, pendingUpdates[info.queueIndex]...)
info.ch.publishUpdate(ctx, up)
err, ok := up.WaitContext(ctx)
if ok && err != nil {
flushUpdateCh <- flushUpdate{
err: fmt.Errorf("failed to apply sync update to gluon %v: %w", up.String(), err),
}
return
}
pendingUpdates[info.queueIndex] = pendingUpdates[info.queueIndex][:0]
}
select {
case flushUpdateCh <- flushUpdate{
messageID: downloadBatch.batch[0].messageID,
err: nil,
batchLen: len(downloadBatch.batch),
}:
case <-ctx.Done():
return
}
}
}, logging.Labels{"sync-stage": "flush"})
return flushUpdateCh
}

View File

@ -29,25 +29,32 @@ type syncReporter struct {
eventPublisher events.EventPublisher
start time.Time
total int
count int
total int64
count int64
last time.Time
freq time.Duration
}
func newSyncReporter(userID string, eventsPublisher events.EventPublisher, total int, freq time.Duration) *syncReporter {
return &syncReporter{
userID: userID,
eventPublisher: eventsPublisher,
start: time.Now(),
total: total,
freq: freq,
}
func (rep *syncReporter) OnStart(ctx context.Context) {
rep.start = time.Now()
rep.eventPublisher.PublishEvent(ctx, events.SyncStarted{UserID: rep.userID})
}
func (rep *syncReporter) add(ctx context.Context, delta int) {
func (rep *syncReporter) OnFinished(ctx context.Context) {
rep.eventPublisher.PublishEvent(ctx, events.SyncFinished{
UserID: rep.userID,
})
}
func (rep *syncReporter) OnError(ctx context.Context, err error) {
rep.eventPublisher.PublishEvent(ctx, events.SyncFailed{
UserID: rep.userID,
Error: err,
})
}
func (rep *syncReporter) OnProgress(ctx context.Context, delta int64) {
rep.count += delta
if time.Since(rep.last) > rep.freq {
@ -62,11 +69,17 @@ func (rep *syncReporter) add(ctx context.Context, delta int) {
}
}
func (rep *syncReporter) done(ctx context.Context) {
rep.eventPublisher.PublishEvent(ctx, events.SyncProgress{
UserID: rep.userID,
Progress: 1,
Elapsed: time.Since(rep.start),
Remaining: 0,
})
func (rep *syncReporter) InitializeProgressCounter(_ context.Context, current int64, total int64) {
rep.count = current
rep.total = total
}
func newSyncReporter(userID string, eventsPublisher events.EventPublisher, freq time.Duration) *syncReporter {
return &syncReporter{
userID: userID,
eventPublisher: eventsPublisher,
start: time.Now(),
freq: freq,
}
}

View File

@ -0,0 +1,253 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"sync"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
"github.com/bradenaw/juniper/xmaps"
)
type SyncState struct {
filePath string
status syncservice.Status
lock sync.Mutex
}
var ErrInvalidSyncFileVersion = errors.New("invalid sync file version")
const SyncFileVersion = 1
type syncStateFile struct {
Version int
Data string
}
type syncFileVersion1 struct {
Status syncservice.Status
}
func NewSyncState(filePath string) (*SyncState, error) {
s := &SyncState{filePath: filePath, status: syncservice.DefaultStatus()}
if err := s.loadUnsafe(); err != nil {
return nil, err
}
return s, nil
}
func (s *SyncState) AddFailedMessageID(_ context.Context, ids ...string) error {
s.lock.Lock()
defer s.lock.Unlock()
count := len(s.status.FailedMessages)
for _, id := range ids {
s.status.FailedMessages.Add(id)
}
// Only update if something change.
if count == len(s.status.FailedMessages) {
return nil
}
return s.storeUnsafe()
}
func (s *SyncState) RemFailedMessageID(_ context.Context, ids ...string) error {
s.lock.Lock()
defer s.lock.Unlock()
count := len(s.status.FailedMessages)
for _, id := range ids {
s.status.FailedMessages.Remove(id)
}
// Only update if something change.
if count == len(s.status.FailedMessages) {
return nil
}
return s.storeUnsafe()
}
func (s *SyncState) GetSyncStatus(_ context.Context) (syncservice.Status, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.status, nil
}
func (s *SyncState) ClearSyncStatus(_ context.Context) error {
s.lock.Lock()
defer s.lock.Unlock()
oldStatus := s.status
s.status = syncservice.DefaultStatus()
if err := s.storeUnsafe(); err != nil {
s.status = oldStatus
return err
}
return nil
}
func (s *SyncState) SetHasLabels(_ context.Context, b bool) error {
s.lock.Lock()
defer s.lock.Unlock()
s.status.HasLabels = b
return s.storeUnsafe()
}
func (s *SyncState) SetHasMessages(_ context.Context, b bool) error {
s.lock.Lock()
defer s.lock.Unlock()
s.status.HasMessages = b
return s.storeUnsafe()
}
func (s *SyncState) SetLastMessageID(_ context.Context, s2 string, i int64) error {
s.lock.Lock()
defer s.lock.Unlock()
s.status.LastSyncedMessageID = s2
s.status.NumSyncedMessages += i
return s.storeUnsafe()
}
func (s *SyncState) SetMessageCount(_ context.Context, i int64) error {
s.lock.Lock()
defer s.lock.Unlock()
s.status.TotalMessageCount = i
s.status.HasMessageCount = true
return s.storeUnsafe()
}
func (s *SyncState) storeUnsafe() error {
return storeImpl(&s.status, s.filePath)
}
func storeImpl(status *syncservice.Status, path string) error {
data, err := json.Marshal(syncFileVersion1{Status: *status})
if err != nil {
return fmt.Errorf("failed to marshal sync state data: %w", err)
}
syncFile := syncStateFile{
Version: SyncFileVersion,
Data: string(data),
}
syncFileData, err := json.Marshal(syncFile)
if err != nil {
return fmt.Errorf("failde to marshal sync state file: %w", err)
}
tmpFile := path + ".tmp"
if err := os.WriteFile(tmpFile, syncFileData, 0o600); err != nil {
return fmt.Errorf("failed to write sync state to tmp file: %w", err)
}
if err := os.Rename(tmpFile, path); err != nil {
return fmt.Errorf("failed to update sync state: %w", err)
}
return nil
}
func (s *SyncState) loadUnsafe() error {
data, err := os.ReadFile(s.filePath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
}
return err
}
var syncFile syncStateFile
if err := json.Unmarshal(data, &syncFile); err != nil {
return fmt.Errorf("failed to unmarshal sync file: %w", err)
}
if syncFile.Version != SyncFileVersion {
return ErrInvalidSyncFileVersion
}
var v1 syncFileVersion1
if err := json.Unmarshal([]byte(syncFile.Data), &v1); err != nil {
return fmt.Errorf("failed to unmarshal sync data: %w", err)
}
s.status = v1.Status
return nil
}
func DeleteSyncState(configDir, userID string) error {
path := getSyncConfigPath(configDir, userID)
return os.Remove(path)
}
func MigrateVaultSettings(
configDir, userID string,
hasLabels, hasMessages bool,
failedMessageIDs []string,
) (bool, error) {
filePath := getSyncConfigPath(configDir, userID)
_, err := os.ReadFile(filePath) //nolint:gosec
if err == nil {
// File already exists, sync has been migrated.
return false, nil
}
if err != nil && !errors.Is(err, os.ErrNotExist) {
// unexpected error occurred.
return false, err
}
status := syncservice.DefaultStatus()
status.HasLabels = hasLabels
status.HasMessages = hasMessages
status.HasMessageCount = hasMessages
status.FailedMessages = xmaps.SetFromSlice(failedMessageIDs)
return true, storeImpl(&status, filePath)
}

View File

@ -0,0 +1,80 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"context"
"testing"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
"github.com/bradenaw/juniper/xmaps"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
)
func TestMigrateSyncSettings_AlreadyExists(t *testing.T) {
tmpDir := t.TempDir()
testFile := getSyncConfigPath(tmpDir, "test")
expected, err := generateTestState(testFile)
require.NoError(t, err)
migrated, err := MigrateVaultSettings(tmpDir, "test", true, true, nil)
require.NoError(t, err)
require.False(t, migrated)
state, err := NewSyncState(testFile)
require.NoError(t, err)
status, err := state.GetSyncStatus(context.Background())
require.NoError(t, err)
require.Equal(t, expected, status)
}
func TestMigrateSyncSettings_DoesNotExist(t *testing.T) {
tmpDir := t.TempDir()
failedIDs := []string{"foo", "bar"}
migrated, err := MigrateVaultSettings(tmpDir, "test", true, true, failedIDs)
require.NoError(t, err)
require.True(t, migrated)
state, err := NewSyncState(getSyncConfigPath(tmpDir, "test"))
require.NoError(t, err)
status, err := state.GetSyncStatus(context.Background())
require.NoError(t, err)
require.Zero(t, status.NumSyncedMessages)
require.Zero(t, status.TotalMessageCount)
require.Empty(t, status.LastSyncedMessageID)
require.ElementsMatch(t, failedIDs, maps.Keys(status.FailedMessages))
require.True(t, status.HasLabels)
require.True(t, status.HasMessageCount)
require.True(t, status.HasMessages)
}
func generateTestState(path string) (syncservice.Status, error) {
status := syncservice.DefaultStatus()
status.HasMessages = true
status.HasLabels = false
status.FailedMessages = xmaps.SetFromSlice([]string{"foo", "bar"})
status.TotalMessageCount = 1204
status.NumSyncedMessages = 100
status.HasMessages = true
return status, storeImpl(&status, path)
}

View File

@ -0,0 +1,234 @@
// Copyright (c) 2023 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package imapservice
import (
"context"
"fmt"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
"github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
"github.com/bradenaw/juniper/xslices"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
)
type SyncUpdateApplier struct {
requestCh chan updateRequest
replyCh chan updateReply
}
type updateReply struct {
updates []imap.Update
err error
}
type updateRequest = func(ctx context.Context, mode usertypes.AddressMode, connectors map[string]*Connector) ([]imap.Update, error)
func NewSyncUpdateApplier() *SyncUpdateApplier {
return &SyncUpdateApplier{
requestCh: make(chan updateRequest),
replyCh: make(chan updateReply),
}
}
func (s *SyncUpdateApplier) Close() {
close(s.requestCh)
close(s.replyCh)
}
func (s *SyncUpdateApplier) ApplySyncUpdates(ctx context.Context, updates []syncservice.BuildResult) error {
request := func(ctx context.Context, mode usertypes.AddressMode, connectors map[string]*Connector) ([]imap.Update, error) {
if mode == usertypes.AddressModeCombined {
if len(connectors) != 1 {
return nil, fmt.Errorf("unexpected connecto list state")
}
c := maps.Values(connectors)[0]
update := imap.NewMessagesCreated(true, xslices.Map(updates, func(b syncservice.BuildResult) *imap.MessageCreated {
return b.Update
})...)
c.publishUpdate(ctx, update)
return []imap.Update{update}, nil
}
updateMap := make(map[string]*imap.MessagesCreated, len(connectors))
result := make([]imap.Update, 0, len(connectors))
for _, up := range updates {
update, ok := updateMap[up.AddressID]
if !ok {
update = imap.NewMessagesCreated(true)
updateMap[up.AddressID] = update
result = append(result, update)
}
update.Messages = append(update.Messages, up.Update)
}
for addrID, update := range updateMap {
c, ok := connectors[addrID]
if !ok {
logrus.Warnf("Could not find connector for address %v", addrID)
continue
}
c.publishUpdate(ctx, update)
}
return result, nil
}
result, err := s.sendRequest(ctx, request)
if err != nil {
return err
}
if err := waitOnIMAPUpdates(ctx, result); err != nil {
return fmt.Errorf("could not apply updates: %w", err)
}
return nil
}
func (s *SyncUpdateApplier) SyncSystemLabelsOnly(ctx context.Context, labels map[string]proton.Label) error {
request := func(ctx context.Context, _ usertypes.AddressMode, connectors map[string]*Connector) ([]imap.Update, error) {
updates := make([]imap.Update, 0, len(labels)*len(connectors))
for _, label := range labels {
if !WantLabel(label) {
continue
}
for _, c := range connectors {
update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name)
updates = append(updates, update)
c.publishUpdate(ctx, update)
}
}
return updates, nil
}
updates, err := s.sendRequest(ctx, request)
if err != nil {
return err
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return fmt.Errorf("could not sync system labels: %w", err)
}
return nil
}
func (s *SyncUpdateApplier) SyncLabels(ctx context.Context, labels map[string]proton.Label) error {
request := func(ctx context.Context, _ usertypes.AddressMode, connectors map[string]*Connector) ([]imap.Update, error) {
return syncLabels(ctx, labels, maps.Values(connectors))
}
updates, err := s.sendRequest(ctx, request)
if err != nil {
return err
}
if err := waitOnIMAPUpdates(ctx, updates); err != nil {
return fmt.Errorf("could not sync labels: %w", err)
}
return nil
}
// nolint:exhaustive
func syncLabels(ctx context.Context, labels map[string]proton.Label, connectors []*Connector) ([]imap.Update, error) {
var updates []imap.Update
// Create placeholder Folders/Labels mailboxes with the \Noselect attribute.
for _, prefix := range []string{folderPrefix, labelPrefix} {
for _, updateCh := range connectors {
update := newPlaceHolderMailboxCreatedUpdate(prefix)
updateCh.publishUpdate(ctx, update)
updates = append(updates, update)
}
}
// Sync the user's labels.
for labelID, label := range labels {
if !WantLabel(label) {
continue
}
switch label.Type {
case proton.LabelTypeSystem:
for _, updateCh := range connectors {
update := newSystemMailboxCreatedUpdate(imap.MailboxID(label.ID), label.Name)
updateCh.publishUpdate(ctx, update)
updates = append(updates, update)
}
case proton.LabelTypeFolder, proton.LabelTypeLabel:
for _, updateCh := range connectors {
update := newMailboxCreatedUpdate(imap.MailboxID(labelID), GetMailboxName(label))
updateCh.publishUpdate(ctx, update)
updates = append(updates, update)
}
default:
return nil, fmt.Errorf("unknown label type: %d", label.Type)
}
}
return updates, nil
}
func (s *SyncUpdateApplier) sendRequest(ctx context.Context, request updateRequest) ([]imap.Update, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case s.requestCh <- request:
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case reply, ok := <-s.replyCh:
if !ok {
return nil, fmt.Errorf("no reply")
}
if reply.err != nil {
return nil, reply.err
}
return reply.updates, nil
}
}
func (s *SyncUpdateApplier) reply(ctx context.Context, updates []imap.Update, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.replyCh <- updateReply{
updates: updates,
err: err,
}:
return nil
}
}

View File

@ -32,6 +32,7 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
bridgesmtp "github.com/ProtonMail/proton-bridge/v3/internal/services/smtp"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
"github.com/ProtonMail/proton-bridge/v3/pkg/cpc"
"github.com/emersion/go-smtp"
"github.com/sirupsen/logrus"
@ -134,7 +135,7 @@ func (sm *Service) AddIMAPUser(
connector connector.Connector,
addrID string,
idProvider imapservice.GluonIDProvider,
syncStateProvider imapservice.SyncStateProvider,
syncStateProvider syncservice.StateProvider,
) error {
_, err := sm.requests.Send(ctx, &smRequestAddIMAPUser{
connector: connector,
@ -302,7 +303,7 @@ func (sm *Service) handleAddIMAPUser(ctx context.Context,
connector connector.Connector,
addrID string,
idProvider imapservice.GluonIDProvider,
syncStateProvider imapservice.SyncStateProvider,
syncStateProvider syncservice.StateProvider,
) error {
// Due to the many different error exits, performer user count change at this stage rather we split the incrementing
// of users from the logic.
@ -318,7 +319,7 @@ func (sm *Service) handleAddIMAPUserImpl(ctx context.Context,
connector connector.Connector,
addrID string,
idProvider imapservice.GluonIDProvider,
syncStateProvider imapservice.SyncStateProvider,
syncStateProvider syncservice.StateProvider,
) error {
if sm.imapServer == nil {
return fmt.Errorf("no imap server instance running")
@ -348,7 +349,7 @@ func (sm *Service) handleAddIMAPUserImpl(ctx context.Context,
}
// Clear the sync status -- we need to resync all messages.
if err := syncStateProvider.ClearSyncStatus(); err != nil {
if err := syncStateProvider.ClearSyncStatus(ctx); err != nil {
return fmt.Errorf("failed to clear sync status: %w", err)
}
@ -358,7 +359,14 @@ func (sm *Service) handleAddIMAPUserImpl(ctx context.Context,
} else if isNew {
panic("IMAP user should already have a database")
}
} else if status := syncStateProvider.GetSyncStatus(); !status.HasLabels {
}
status, err := syncStateProvider.GetSyncStatus(ctx)
if err != nil {
return fmt.Errorf("failed to get sync status: %w", err)
}
if !status.HasLabels {
// Otherwise, the DB already exists -- if the labels are not yet synced, we need to re-create the DB.
if err := sm.imapServer.RemoveUser(ctx, gluonID, true); err != nil {
return fmt.Errorf("failed to remove old IMAP user: %w", err)
@ -710,7 +718,7 @@ type smRequestAddIMAPUser struct {
connector connector.Connector
addrID string
idProvider imapservice.GluonIDProvider
syncStateProvider imapservice.SyncStateProvider
syncStateProvider syncservice.StateProvider
}
type smRequestRemoveIMAPUser struct {

View File

@ -138,7 +138,10 @@ func (apm DiagnosticMetadata) BuildMailboxToMessageMap(ctx context.Context, user
}
func (user *User) GetDiagnosticMetadata(ctx context.Context) (DiagnosticMetadata, error) {
failedMessages := xmaps.SetFromSlice(user.vault.SyncStatus().FailedMessageIDs)
failedMessages, err := user.imapService.GetSyncFailedMessageIDs(ctx)
if err != nil {
return DiagnosticMetadata{}, err
}
messageIDs, err := user.client.GetAllMessageIDs(ctx, "")
if err != nil {
@ -159,7 +162,7 @@ func (user *User) GetDiagnosticMetadata(ctx context.Context) (DiagnosticMetadata
return DiagnosticMetadata{
MessageIDs: messageIDs,
Metadata: meta,
FailedMessageIDs: failedMessages,
FailedMessageIDs: xmaps.SetFromSlice(failedMessages),
}, nil
}

View File

@ -34,6 +34,7 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks"
"github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder"
"github.com/ProtonMail/proton-bridge/v3/internal/services/smtp"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
telemetryservice "github.com/ProtonMail/proton-bridge/v3/internal/services/telemetry"
"github.com/ProtonMail/proton-bridge/v3/internal/services/userevents"
"github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity"
@ -102,6 +103,8 @@ func New(
imapServerManager imapservice.IMAPServerManager,
smtpServerManager smtp.ServerManager,
eventSubscription events.Subscription,
syncService syncservice.Regulator,
syncConfigDir string,
) (*User, error) {
user, err := newImpl(
ctx,
@ -117,6 +120,8 @@ func New(
imapServerManager,
smtpServerManager,
eventSubscription,
syncService,
syncConfigDir,
)
if err != nil {
// Cleanup any pending resources on error
@ -145,9 +150,27 @@ func newImpl(
imapServerManager imapservice.IMAPServerManager,
smtpServerManager smtp.ServerManager,
eventSubscription events.Subscription,
syncService syncservice.Regulator,
syncConfigDir string,
) (*User, error) {
logrus.WithField("userID", apiUser.ID).Info("Creating new user")
// Migrate Sync Status from Vault.
{
syncStatus := encVault.SyncStatus()
migrated, err := imapservice.MigrateVaultSettings(syncConfigDir, apiUser.ID, syncStatus.HasLabels, syncStatus.HasMessages, syncStatus.FailedMessageIDs)
if err != nil {
return nil, fmt.Errorf("failed to migrate user sync settings: %w", err)
}
if migrated {
if err := encVault.ClearSyncStatus(); err != nil {
return nil, fmt.Errorf("failed to clear sync settings from vault: %w", err)
}
}
}
// Get the user's API addresses.
apiAddrs, err := client.GetAddresses(ctx)
if err != nil {
@ -238,7 +261,6 @@ func newImpl(
client,
identityState.Clone(),
user,
encVault,
user.eventService,
imapServerManager,
user,
@ -250,6 +272,7 @@ func newImpl(
reporter,
addressMode,
eventSubscription,
syncConfigDir,
user.maxSyncMemory,
showAllMail,
)
@ -299,7 +322,7 @@ func newImpl(
}
// Start IMAP Service
if err := user.imapService.Start(ctx, user.serviceGroup); err != nil {
if err := user.imapService.Start(ctx, user.serviceGroup, syncService); err != nil {
return user, fmt.Errorf("failed to start imap service: %w", err)
}

View File

@ -156,6 +156,8 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma
nullIMAPServerManager,
nullSMTPServerManager,
nullEventSubscription,
nil,
"",
)
require.NoError(tb, err)
defer user.Close()