diff --git a/internal/events/events.go b/internal/events/events.go index 84933cc9..2d5f6683 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -35,3 +35,7 @@ func (eventBase) _isEvent() {} type EventPublisher interface { PublishEvent(ctx context.Context, event Event) } + +type NullEventPublisher struct{} + +func (NullEventPublisher) PublishEvent(_ context.Context, _ Event) {} diff --git a/internal/user/user.go b/internal/user/user.go index f1691a61..2fdbb86a 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -42,6 +42,8 @@ import ( "github.com/ProtonMail/proton-bridge/v3/internal/safe" "github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder" "github.com/ProtonMail/proton-bridge/v3/internal/services/smtp" + "github.com/ProtonMail/proton-bridge/v3/internal/services/userevents" + "github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity" "github.com/ProtonMail/proton-bridge/v3/internal/telemetry" "github.com/ProtonMail/proton-bridge/v3/internal/usertypes" "github.com/ProtonMail/proton-bridge/v3/internal/vault" @@ -97,14 +99,15 @@ type User struct { maxSyncMemory uint64 - panicHandler async.PanicHandler - + panicHandler async.PanicHandler configStatus *configstatus.ConfigurationStatus telemetryManager telemetry.Availability // goStatusProgress triggers a check/sending if progress is needed. goStatusProgress func() - smtpService *smtp.Service + eventService *userevents.Service + identityService *useridentity.Service + smtpService *smtp.Service } // New returns a new user. @@ -134,6 +137,8 @@ func New( return nil, fmt.Errorf("failed to get labels: %w", err) } + identityState := useridentity.NewState(apiUser, slices.Clone(apiAddrs), client) + logrus.WithFields(logrus.Fields{ "userID": apiUser.ID, "numAddr": len(apiAddrs), @@ -146,6 +151,28 @@ func New( return nil, fmt.Errorf("failed to init configuration status file: %w", err) } + // Use null publisher for now to avoid conflicts with original event loop. + eventPublisher := &events.NullEventPublisher{} + + // Use in memory store to avoid conflicts with original event loop. + idStore := userevents.NewInMemoryEventIDStore() + _ = idStore.Store(context.Background(), encVault.EventID()) + + eventService := userevents.NewService( + apiUser.ID, + client, + // Use in memory store to avoid conflicts with the original event loop. + idStore, + eventPublisher, + EventPeriod, + 5*time.Minute, + crashHandler, + ) + + sendRecorder := sendrecorder.NewSendRecorder(sendrecorder.SendEntryExpiry) + + identityService := useridentity.NewService(eventService, eventPublisher, identityState) + // Create the user object. user := &User{ log: logrus.WithField("userID", apiUser.ID), @@ -153,7 +180,7 @@ func New( vault: encVault, client: client, reporter: reporter, - sendHash: sendrecorder.NewSendRecorder(sendrecorder.SendEntryExpiry), + sendHash: sendRecorder, eventCh: async.NewQueuedChannel[events.Event](0, 0, crashHandler, fmt.Sprintf("bridge-user-%v", apiUser.ID)), eventLock: safe.NewRWMutex(), @@ -181,9 +208,25 @@ func New( configStatus: configStatus, telemetryManager: telemetryManager, + + identityService: identityService, + smtpService: nil, + eventService: eventService, } - user.smtpService = smtp.NewService(user, client, user.sendHash, user.panicHandler, user.reporter) + user.smtpService = smtp.NewService( + apiUser.ID, + client, + sendRecorder, + crashHandler, + reporter, + encVault, + encVault, + user, + eventService, + vaultToSMTPAddressMode(encVault.AddressMode()), + identityState.Clone(), + ) // Check for status_progress when triggered. user.goStatusProgress = user.tasks.PeriodicOrTrigger(configstatus.ProgressCheckInterval, 0, func(ctx context.Context) { @@ -271,9 +314,21 @@ func New( } }) + // Start Event Service + if err := user.eventService.Start(ctx, user.tasks); err != nil { + return nil, fmt.Errorf("failed to start event service: %w", err) + } + + // Start Identity Service + user.identityService.Start(user.tasks) + // Start SMTP Service user.smtpService.Start(user.tasks) + if err := user.eventService.Resume(ctx); err != nil { + return nil, fmt.Errorf("failed to resume event service") + } + return user, nil } @@ -336,7 +391,7 @@ func (user *User) GetAddressMode() vault.AddressMode { } // SetAddressMode sets the user's address mode. -func (user *User) SetAddressMode(_ context.Context, mode vault.AddressMode) error { +func (user *User) SetAddressMode(ctx context.Context, mode vault.AddressMode) error { user.log.WithField("mode", mode).Info("Setting address mode") user.syncAbort.Abort() @@ -347,6 +402,10 @@ func (user *User) SetAddressMode(_ context.Context, mode vault.AddressMode) erro return fmt.Errorf("failed to set address mode: %w", err) } + if err := user.smtpService.SetAddressMode(ctx, vaultToSMTPAddressMode(mode)); err != nil { + return fmt.Errorf("failed to set smtp address mode: %w", err) + } + if err := user.clearSyncStatus(); err != nil { return fmt.Errorf("failed to clear sync status: %w", err) } @@ -617,6 +676,9 @@ func (user *User) Close() { // Close the user's notify channel. user.eventCh.CloseAndDiscardQueued() + // Cleanup Event Service. + user.eventService.Close() + // Close the user's vault. if err := user.vault.Close(); err != nil { user.log.WithError(err).Error("Failed to close vault") @@ -855,3 +917,15 @@ func sleepCtx(ctx context.Context, d time.Duration) { case <-time.After(d): } } + +func vaultToSMTPAddressMode(mode vault.AddressMode) smtp.AddressMode { + var smtpAddressMode smtp.AddressMode + + if mode == vault.SplitMode { + smtpAddressMode = smtp.AddressModeSplit + } else { + smtpAddressMode = smtp.AddressModeCombined + } + + return smtpAddressMode +}