1
0

feat(GODT-2801): User Service Integration

Enable the User Event, User Identity and User SMTP service. The services
don't persist any data at the moment to avoid conflict with the existing
event loop.
This commit is contained in:
Leander Beernaert
2023-07-24 17:09:03 +02:00
parent 0b35f41ffd
commit 09c523e2d2
2 changed files with 84 additions and 6 deletions

View File

@ -35,3 +35,7 @@ func (eventBase) _isEvent() {}
type EventPublisher interface { type EventPublisher interface {
PublishEvent(ctx context.Context, event Event) PublishEvent(ctx context.Context, event Event)
} }
type NullEventPublisher struct{}
func (NullEventPublisher) PublishEvent(_ context.Context, _ Event) {}

View File

@ -42,6 +42,8 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/safe" "github.com/ProtonMail/proton-bridge/v3/internal/safe"
"github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder" "github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder"
"github.com/ProtonMail/proton-bridge/v3/internal/services/smtp" "github.com/ProtonMail/proton-bridge/v3/internal/services/smtp"
"github.com/ProtonMail/proton-bridge/v3/internal/services/userevents"
"github.com/ProtonMail/proton-bridge/v3/internal/services/useridentity"
"github.com/ProtonMail/proton-bridge/v3/internal/telemetry" "github.com/ProtonMail/proton-bridge/v3/internal/telemetry"
"github.com/ProtonMail/proton-bridge/v3/internal/usertypes" "github.com/ProtonMail/proton-bridge/v3/internal/usertypes"
"github.com/ProtonMail/proton-bridge/v3/internal/vault" "github.com/ProtonMail/proton-bridge/v3/internal/vault"
@ -97,14 +99,15 @@ type User struct {
maxSyncMemory uint64 maxSyncMemory uint64
panicHandler async.PanicHandler panicHandler async.PanicHandler
configStatus *configstatus.ConfigurationStatus configStatus *configstatus.ConfigurationStatus
telemetryManager telemetry.Availability telemetryManager telemetry.Availability
// goStatusProgress triggers a check/sending if progress is needed. // goStatusProgress triggers a check/sending if progress is needed.
goStatusProgress func() goStatusProgress func()
smtpService *smtp.Service eventService *userevents.Service
identityService *useridentity.Service
smtpService *smtp.Service
} }
// New returns a new user. // New returns a new user.
@ -134,6 +137,8 @@ func New(
return nil, fmt.Errorf("failed to get labels: %w", err) return nil, fmt.Errorf("failed to get labels: %w", err)
} }
identityState := useridentity.NewState(apiUser, slices.Clone(apiAddrs), client)
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"userID": apiUser.ID, "userID": apiUser.ID,
"numAddr": len(apiAddrs), "numAddr": len(apiAddrs),
@ -146,6 +151,28 @@ func New(
return nil, fmt.Errorf("failed to init configuration status file: %w", err) return nil, fmt.Errorf("failed to init configuration status file: %w", err)
} }
// Use null publisher for now to avoid conflicts with original event loop.
eventPublisher := &events.NullEventPublisher{}
// Use in memory store to avoid conflicts with original event loop.
idStore := userevents.NewInMemoryEventIDStore()
_ = idStore.Store(context.Background(), encVault.EventID())
eventService := userevents.NewService(
apiUser.ID,
client,
// Use in memory store to avoid conflicts with the original event loop.
idStore,
eventPublisher,
EventPeriod,
5*time.Minute,
crashHandler,
)
sendRecorder := sendrecorder.NewSendRecorder(sendrecorder.SendEntryExpiry)
identityService := useridentity.NewService(eventService, eventPublisher, identityState)
// Create the user object. // Create the user object.
user := &User{ user := &User{
log: logrus.WithField("userID", apiUser.ID), log: logrus.WithField("userID", apiUser.ID),
@ -153,7 +180,7 @@ func New(
vault: encVault, vault: encVault,
client: client, client: client,
reporter: reporter, reporter: reporter,
sendHash: sendrecorder.NewSendRecorder(sendrecorder.SendEntryExpiry), sendHash: sendRecorder,
eventCh: async.NewQueuedChannel[events.Event](0, 0, crashHandler, fmt.Sprintf("bridge-user-%v", apiUser.ID)), eventCh: async.NewQueuedChannel[events.Event](0, 0, crashHandler, fmt.Sprintf("bridge-user-%v", apiUser.ID)),
eventLock: safe.NewRWMutex(), eventLock: safe.NewRWMutex(),
@ -181,9 +208,25 @@ func New(
configStatus: configStatus, configStatus: configStatus,
telemetryManager: telemetryManager, telemetryManager: telemetryManager,
identityService: identityService,
smtpService: nil,
eventService: eventService,
} }
user.smtpService = smtp.NewService(user, client, user.sendHash, user.panicHandler, user.reporter) user.smtpService = smtp.NewService(
apiUser.ID,
client,
sendRecorder,
crashHandler,
reporter,
encVault,
encVault,
user,
eventService,
vaultToSMTPAddressMode(encVault.AddressMode()),
identityState.Clone(),
)
// Check for status_progress when triggered. // Check for status_progress when triggered.
user.goStatusProgress = user.tasks.PeriodicOrTrigger(configstatus.ProgressCheckInterval, 0, func(ctx context.Context) { user.goStatusProgress = user.tasks.PeriodicOrTrigger(configstatus.ProgressCheckInterval, 0, func(ctx context.Context) {
@ -271,9 +314,21 @@ func New(
} }
}) })
// Start Event Service
if err := user.eventService.Start(ctx, user.tasks); err != nil {
return nil, fmt.Errorf("failed to start event service: %w", err)
}
// Start Identity Service
user.identityService.Start(user.tasks)
// Start SMTP Service // Start SMTP Service
user.smtpService.Start(user.tasks) user.smtpService.Start(user.tasks)
if err := user.eventService.Resume(ctx); err != nil {
return nil, fmt.Errorf("failed to resume event service")
}
return user, nil return user, nil
} }
@ -336,7 +391,7 @@ func (user *User) GetAddressMode() vault.AddressMode {
} }
// SetAddressMode sets the user's address mode. // SetAddressMode sets the user's address mode.
func (user *User) SetAddressMode(_ context.Context, mode vault.AddressMode) error { func (user *User) SetAddressMode(ctx context.Context, mode vault.AddressMode) error {
user.log.WithField("mode", mode).Info("Setting address mode") user.log.WithField("mode", mode).Info("Setting address mode")
user.syncAbort.Abort() user.syncAbort.Abort()
@ -347,6 +402,10 @@ func (user *User) SetAddressMode(_ context.Context, mode vault.AddressMode) erro
return fmt.Errorf("failed to set address mode: %w", err) return fmt.Errorf("failed to set address mode: %w", err)
} }
if err := user.smtpService.SetAddressMode(ctx, vaultToSMTPAddressMode(mode)); err != nil {
return fmt.Errorf("failed to set smtp address mode: %w", err)
}
if err := user.clearSyncStatus(); err != nil { if err := user.clearSyncStatus(); err != nil {
return fmt.Errorf("failed to clear sync status: %w", err) return fmt.Errorf("failed to clear sync status: %w", err)
} }
@ -617,6 +676,9 @@ func (user *User) Close() {
// Close the user's notify channel. // Close the user's notify channel.
user.eventCh.CloseAndDiscardQueued() user.eventCh.CloseAndDiscardQueued()
// Cleanup Event Service.
user.eventService.Close()
// Close the user's vault. // Close the user's vault.
if err := user.vault.Close(); err != nil { if err := user.vault.Close(); err != nil {
user.log.WithError(err).Error("Failed to close vault") user.log.WithError(err).Error("Failed to close vault")
@ -855,3 +917,15 @@ func sleepCtx(ctx context.Context, d time.Duration) {
case <-time.After(d): case <-time.After(d):
} }
} }
func vaultToSMTPAddressMode(mode vault.AddressMode) smtp.AddressMode {
var smtpAddressMode smtp.AddressMode
if mode == vault.SplitMode {
smtpAddressMode = smtp.AddressModeSplit
} else {
smtpAddressMode = smtp.AddressModeCombined
}
return smtpAddressMode
}