diff --git a/go.mod b/go.mod
index 05281c68..fba5154f 100644
--- a/go.mod
+++ b/go.mod
@@ -9,7 +9,7 @@ require (
github.com/Masterminds/semver/v3 v3.2.0
github.com/ProtonMail/gluon v0.17.1-0.20240514133734-79cdd0fec41c
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a
- github.com/ProtonMail/go-proton-api v0.4.1-0.20240821081056-dd607af0f917
+ github.com/ProtonMail/go-proton-api v0.4.1-0.20240827122236-ca6bb6449bba
github.com/ProtonMail/gopenpgp/v2 v2.7.4-proton
github.com/PuerkitoBio/goquery v1.8.1
github.com/abiosoft/ishell v2.0.0+incompatible
diff --git a/go.sum b/go.sum
index c4dd64ba..041fe488 100644
--- a/go.sum
+++ b/go.sum
@@ -46,6 +46,12 @@ github.com/ProtonMail/go-proton-api v0.4.1-0.20240819131705-149e50199c5b h1:zifG
github.com/ProtonMail/go-proton-api v0.4.1-0.20240819131705-149e50199c5b/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
github.com/ProtonMail/go-proton-api v0.4.1-0.20240821081056-dd607af0f917 h1:Ma6PfXFDuw7rYYq28FXNW6ubhYquRUmBuLyZrjJWHUE=
github.com/ProtonMail/go-proton-api v0.4.1-0.20240821081056-dd607af0f917/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
+github.com/ProtonMail/go-proton-api v0.4.1-0.20240822150235-7a6190889179 h1:6Xo0iRYa4GBgZ2HA+IR3KdqiML8Z10h2F9TYe+9n1+M=
+github.com/ProtonMail/go-proton-api v0.4.1-0.20240822150235-7a6190889179/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
+github.com/ProtonMail/go-proton-api v0.4.1-0.20240827084449-71096377c391 h1:PW6bE+mhsfAx4+wDCCNjhFrCNiiuMjY6j7RwqRUdPKI=
+github.com/ProtonMail/go-proton-api v0.4.1-0.20240827084449-71096377c391/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
+github.com/ProtonMail/go-proton-api v0.4.1-0.20240827122236-ca6bb6449bba h1:QtDxgIbgPqRQg7VT+nIUJlaOyNFAoGyg59oW3Hji/0A=
+github.com/ProtonMail/go-proton-api v0.4.1-0.20240827122236-ca6bb6449bba/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
github.com/ProtonMail/go-smtp v0.0.0-20231109081432-2b3d50599865 h1:EP1gnxLL5Z7xBSymE9nSTM27nRYINuvssAtDmG0suD8=
github.com/ProtonMail/go-smtp v0.0.0-20231109081432-2b3d50599865/go.mod h1:qm27SGYgoIPRot6ubfQ/GpiPy/g3PaZAVRxiO/sDUgQ=
github.com/ProtonMail/go-srp v0.0.7 h1:Sos3Qk+th4tQR64vsxGIxYpN3rdnG9Wf9K4ZloC1JrI=
diff --git a/internal/bridge/bridge.go b/internal/bridge/bridge.go
index e18270c3..79d76426 100644
--- a/internal/bridge/bridge.go
+++ b/internal/bridge/bridge.go
@@ -45,6 +45,7 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/safe"
"github.com/ProtonMail/proton-bridge/v3/internal/sentry"
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapsmtpserver"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
"github.com/ProtonMail/proton-bridge/v3/internal/telemetry"
"github.com/ProtonMail/proton-bridge/v3/internal/unleash"
@@ -137,8 +138,12 @@ type Bridge struct {
serverManager *imapsmtpserver.Service
syncService *syncservice.Service
+
// unleashService is responsible for polling the feature flags and caching
unleashService *unleash.Service
+
+ // observabilityService is responsible for handling calls to the observability system
+ observabilityService *observability.Service
}
var logPkg = logrus.WithField("pkg", "bridge") //nolint:gochecknoglobals
@@ -300,6 +305,8 @@ func newBridge(
syncService: syncservice.NewService(reporter, panicHandler),
unleashService: unleashService,
+
+ observabilityService: observability.NewService(ctx, panicHandler),
}
bridge.serverManager = imapsmtpserver.NewService(context.Background(),
@@ -329,6 +336,8 @@ func newBridge(
bridge.unleashService.Run()
+ bridge.observabilityService.Run()
+
return bridge, nil
}
@@ -456,6 +465,9 @@ func (bridge *Bridge) GetErrors() []error {
func (bridge *Bridge) Close(ctx context.Context) {
logPkg.Info("Closing bridge")
+ // Stop observability service
+ bridge.observabilityService.Stop()
+
// Stop heart beat before closing users.
bridge.heartbeat.stop()
@@ -690,3 +702,7 @@ func GetUpdatedCachePath(gluonDBPath, gluonCachePath string) string {
func (bridge *Bridge) GetFeatureFlagValue(key string) bool {
return bridge.unleashService.GetFlagValue(key)
}
+
+func (bridge *Bridge) PushObservabilityMetric(metric proton.ObservabilityMetric) {
+ bridge.observabilityService.AddMetric(metric)
+}
diff --git a/internal/bridge/observability_test.go b/internal/bridge/observability_test.go
new file mode 100644
index 00000000..4cd7921d
--- /dev/null
+++ b/internal/bridge/observability_test.go
@@ -0,0 +1,97 @@
+// Copyright (c) 2024 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package bridge_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/go-proton-api/server"
+ "github.com/ProtonMail/proton-bridge/v3/internal/bridge"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
+ "github.com/stretchr/testify/require"
+ "golang.org/x/net/context"
+)
+
+func TestBridge_Observability(t *testing.T) {
+ testMetric := proton.ObservabilityMetric{
+ Name: "test1",
+ Version: 1,
+ Timestamp: time.Now().Unix(),
+ Data: nil,
+ }
+
+ withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, vaultKey []byte) {
+ throttlePeriod := time.Millisecond * 500
+ observability.ModifyThrottlePeriod(throttlePeriod)
+
+ withBridge(ctx, t, s.GetHostURL(), netCtl, locator, vaultKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) {
+ require.NoError(t, getErr(bridge.LoginFull(ctx, username, password, nil, nil)))
+
+ bridge.PushObservabilityMetric(testMetric)
+ time.Sleep(time.Millisecond * 50) // Wait for the metric to be sent
+ require.Equal(t, 1, len(s.GetObservabilityStatistics().Metrics))
+
+ for i := 0; i < 10; i++ {
+ time.Sleep(time.Millisecond * 5) // Minor delay between each so our tests aren't flaky
+ bridge.PushObservabilityMetric(testMetric)
+ }
+ // We should still have only 1 metric sent as the throttleDuration has not passed
+ require.Equal(t, 1, len(s.GetObservabilityStatistics().Metrics))
+
+ // Wait for throttle duration to pass; we should have our remaining metrics posted
+ time.Sleep(throttlePeriod)
+ require.Equal(t, 11, len(s.GetObservabilityStatistics().Metrics))
+
+ // Wait for the throttle duration to reset; i.e. so we have enough time to send a request immediately
+ time.Sleep(throttlePeriod)
+ for i := 0; i < 10; i++ {
+ time.Sleep(time.Millisecond * 5)
+ bridge.PushObservabilityMetric(testMetric)
+ }
+ // We should only have one additional metric sent immediately
+ require.Equal(t, 12, len(s.GetObservabilityStatistics().Metrics))
+
+ // Wait for the others to be sent
+ time.Sleep(throttlePeriod)
+ require.Equal(t, 21, len(s.GetObservabilityStatistics().Metrics))
+
+ // Spam the endpoint a bit
+ for i := 0; i < 300; i++ {
+ if i < 200 {
+ time.Sleep(time.Millisecond * 10)
+ }
+ bridge.PushObservabilityMetric(testMetric)
+ }
+
+ // Ensure we've sent all metrics
+ time.Sleep(throttlePeriod)
+
+ observabilityStats := s.GetObservabilityStatistics()
+ require.Equal(t, 321, len(observabilityStats.Metrics))
+
+ // Verify that each request had a throttleDuration time difference between each request
+ for i := 0; i < len(observabilityStats.RequestTime)-1; i++ {
+ tOne := observabilityStats.RequestTime[i]
+ tTwo := observabilityStats.RequestTime[i+1]
+ require.True(t, tTwo.Sub(tOne).Abs() > throttlePeriod)
+ }
+ })
+ })
+}
diff --git a/internal/bridge/user.go b/internal/bridge/user.go
index 82b29b55..04455822 100644
--- a/internal/bridge/user.go
+++ b/internal/bridge/user.go
@@ -566,6 +566,7 @@ func (bridge *Bridge) addUserWithVault(
bridge.serverManager,
&bridgeEventSubscription{b: bridge},
bridge.syncService,
+ bridge.observabilityService,
syncSettingsPath,
isNew,
)
diff --git a/internal/services/observability/service.go b/internal/services/observability/service.go
new file mode 100644
index 00000000..d57f6d94
--- /dev/null
+++ b/internal/services/observability/service.go
@@ -0,0 +1,279 @@
+// Copyright (c) 2024 Proton AG
+//
+// This file is part of Proton Mail Bridge.
+//
+// Proton Mail Bridge is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Proton Mail Bridge is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Proton Mail Bridge. If not, see .
+
+package observability
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/ProtonMail/gluon/async"
+ "github.com/ProtonMail/go-proton-api"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/telemetry"
+ "github.com/sirupsen/logrus"
+)
+
+// Non-const for testing.
+var throttleDuration = 5 * time.Second //nolint:gochecknoglobals
+
+const (
+ maxStorageSize = 5000
+ maxBatchSize = 1000
+)
+
+type client struct {
+ isTelemetryEnabled func(context.Context) bool
+ sendMetrics func(context.Context, proton.ObservabilityBatch) error
+}
+
+type Service struct {
+ ctx context.Context
+ cancel context.CancelFunc
+
+ panicHandler async.PanicHandler
+
+ lastDispatch time.Time
+ isDispatchScheduled bool
+
+ signalDataArrived chan struct{}
+ signalDispatch chan struct{}
+
+ log *logrus.Entry
+
+ metricStore []proton.ObservabilityMetric
+ metricStoreLock sync.Mutex
+
+ userClientStore map[string]*client
+ userClientStoreLock sync.Mutex
+}
+
+func NewService(ctx context.Context, panicHandler async.PanicHandler) *Service {
+ ctx, cancel := context.WithCancel(ctx)
+
+ service := &Service{
+ ctx: ctx,
+ cancel: cancel,
+
+ panicHandler: panicHandler,
+
+ lastDispatch: time.Now().Add(-throttleDuration),
+
+ signalDataArrived: make(chan struct{}, 1),
+ signalDispatch: make(chan struct{}, 1),
+
+ log: logrus.WithFields(logrus.Fields{"pkg": "observability"}),
+
+ metricStore: make([]proton.ObservabilityMetric, 0),
+
+ userClientStore: make(map[string]*client),
+ }
+
+ return service
+}
+
+func (s *Service) Run() {
+ s.log.Info("Starting service")
+ go func() {
+ s.start()
+ }()
+}
+
+// When new data is received, we determine if we can immediately send the request.
+// First, we check if a dispatch operation is already scheduled. If it is, we do nothing.
+// If no dispatch is scheduled, we verify if the required time interval has passed since the last send.
+// If the interval hasn't passed, we schedule the dispatch to occur when the threshold is met.
+// If the interval has passed, we initiate an immediate dispatch.
+func (s *Service) start() {
+ defer async.HandlePanic(s.panicHandler)
+ for {
+ select {
+ case <-s.ctx.Done():
+ return
+
+ case <-s.signalDispatch:
+ s.dispatchData()
+
+ case <-s.signalDataArrived:
+ if s.isDispatchScheduled {
+ continue
+ }
+
+ if time.Since(s.lastDispatch) <= throttleDuration {
+ s.scheduleDispatch()
+ continue
+ }
+
+ s.sendSignal(s.signalDispatch)
+ }
+ }
+}
+
+func (s *Service) dispatchData() {
+ s.isDispatchScheduled = false // Only accessed via a single goroutine, so no mutexes.
+ if !s.haveMetricsAndClients() {
+ return
+ }
+
+ // Get a copy of the metrics we want to send and batch them accordingly
+ var numberOfRemainingMetrics int
+ var metricsToSend []proton.ObservabilityMetric
+
+ s.withMetricStoreLock(func() {
+ numberOfMetricsToSend := min(len(s.metricStore), maxBatchSize)
+ metricsToSend = make([]proton.ObservabilityMetric, numberOfMetricsToSend)
+ copy(metricsToSend, s.metricStore[:numberOfMetricsToSend])
+ s.metricStore = s.metricStore[numberOfMetricsToSend:]
+ numberOfRemainingMetrics = len(s.metricStore)
+ })
+
+ // Send them out to the endpoint
+ telemetryEnabled := s.dispatchViaClient(&metricsToSend)
+
+ // If there are more metric updates than the max batch limit and telemetry is enabled for one of the clients
+ // then we immediately schedule another dispatch.
+ if numberOfRemainingMetrics > 0 && telemetryEnabled {
+ s.scheduleDispatch()
+ }
+}
+
+// dispatchViaClient - return value tells us whether telemetry is enabled
+// such that we know whether to schedule another dispatch if more data is present.
+func (s *Service) dispatchViaClient(metricsToSend *[]proton.ObservabilityMetric) bool {
+ s.log.Info("Sending observability data.")
+ s.userClientStoreLock.Lock()
+ defer s.userClientStoreLock.Unlock()
+
+ for _, value := range s.userClientStore {
+ if !value.isTelemetryEnabled(s.ctx) {
+ continue
+ }
+
+ if err := value.sendMetrics(s.ctx, proton.ObservabilityBatch{Metrics: *metricsToSend}); err != nil {
+ s.log.WithError(err).Error("Issue occurred when sending observability data.")
+ } else {
+ s.log.Info("Successfully sent observability data.")
+ }
+
+ s.lastDispatch = time.Now()
+ return true
+ }
+
+ s.log.Info("Could not send observability data. Telemetry is not enabled.")
+ return false
+}
+
+func (s *Service) scheduleDispatch() {
+ waitTime := throttleDuration - time.Since(s.lastDispatch)
+ if waitTime <= 0 {
+ s.sendSignal(s.signalDispatch)
+ return
+ }
+
+ s.log.Info("Scheduling observability data sending")
+
+ s.isDispatchScheduled = true
+ go func() {
+ defer async.HandlePanic(s.panicHandler)
+ select {
+ case <-s.ctx.Done():
+ return
+ case <-time.After(waitTime):
+ s.sendSignal(s.signalDispatch)
+ }
+ }()
+}
+
+func (s *Service) AddMetric(metric proton.ObservabilityMetric) {
+ s.withMetricStoreLock(func() {
+ metricStoreLength := len(s.metricStore)
+ if metricStoreLength >= maxStorageSize {
+ s.log.Info("Max metric storage size has been exceeded. Dropping oldest metrics.")
+
+ dropCount := metricStoreLength - maxStorageSize + 1
+ s.metricStore = s.metricStore[dropCount:]
+ }
+ s.metricStore = append(s.metricStore, metric)
+ })
+
+ s.sendSignal(s.signalDataArrived)
+}
+
+func (s *Service) RegisterUserClient(userID string, protonClient *proton.Client, telemetryService *telemetry.Service) {
+ s.log.Info("Registering user client, ID:", userID)
+
+ s.withUserClientStoreLock(func() {
+ s.userClientStore[userID] = &client{
+ isTelemetryEnabled: telemetryService.IsTelemetryEnabled,
+ sendMetrics: protonClient.SendObservabilityBatch,
+ }
+ })
+
+ // There may be a case where we already have metric updates stored, so try to flush;
+ s.sendSignal(s.signalDataArrived)
+}
+
+func (s *Service) DeregisterUserClient(userID string) {
+ s.log.Info("De-registering user client, ID:", userID)
+
+ s.withUserClientStoreLock(func() {
+ delete(s.userClientStore, userID)
+ })
+}
+
+func (s *Service) Stop() {
+ s.log.Info("Stopping service")
+
+ s.cancel()
+ close(s.signalDataArrived)
+ close(s.signalDispatch)
+}
+
+// Utility functions below.
+func (s *Service) haveMetricsAndClients() bool {
+ s.metricStoreLock.Lock()
+ s.userClientStoreLock.Lock()
+ defer s.metricStoreLock.Unlock()
+ defer s.userClientStoreLock.Unlock()
+
+ return len(s.metricStore) > 0 && len(s.userClientStore) > 0
+}
+
+func (s *Service) withUserClientStoreLock(fn func()) {
+ s.userClientStoreLock.Lock()
+ defer s.userClientStoreLock.Unlock()
+ fn()
+}
+
+func (s *Service) withMetricStoreLock(fn func()) {
+ s.metricStoreLock.Lock()
+ defer s.metricStoreLock.Unlock()
+ fn()
+}
+
+// We use buffered channels; we shouldn't block them.
+func (s *Service) sendSignal(channel chan struct{}) {
+ select {
+ case channel <- struct{}{}:
+ default:
+ }
+}
+
+// ModifyThrottlePeriod - used for testing.
+func ModifyThrottlePeriod(duration time.Duration) {
+ throttleDuration = duration
+}
diff --git a/internal/user/user.go b/internal/user/user.go
index 40784655..be8addcf 100644
--- a/internal/user/user.go
+++ b/internal/user/user.go
@@ -31,6 +31,7 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/internal/safe"
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
"github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks"
"github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder"
"github.com/ProtonMail/proton-bridge/v3/internal/services/smtp"
@@ -86,6 +87,8 @@ type User struct {
imapService *imapservice.Service
telemetryService *telemetryservice.Service
+ observabilityService *observability.Service
+
serviceGroup *orderedtasks.OrderedCancelGroup
}
@@ -104,6 +107,7 @@ func New(
smtpServerManager smtp.ServerManager,
eventSubscription events.Subscription,
syncService syncservice.Regulator,
+ observabilityService *observability.Service,
syncConfigDir string,
isNew bool,
) (*User, error) {
@@ -122,6 +126,7 @@ func New(
smtpServerManager,
eventSubscription,
syncService,
+ observabilityService,
syncConfigDir,
isNew,
)
@@ -153,6 +158,7 @@ func newImpl(
smtpServerManager smtp.ServerManager,
eventSubscription events.Subscription,
syncService syncservice.Regulator,
+ observabilityService *observability.Service,
syncConfigDir string,
isNew bool,
) (*User, error) {
@@ -215,6 +221,8 @@ func newImpl(
serviceGroup: orderedtasks.NewOrderedCancelGroup(crashHandler),
smtpService: nil,
+
+ observabilityService: observabilityService,
}
user.eventService = userevents.NewService(
@@ -318,6 +326,9 @@ func newImpl(
// Start Identity Service
user.identityService.Start(ctx, user.serviceGroup)
+ // Add user client to observability service
+ observabilityService.RegisterUserClient(user.id, client, user.telemetryService)
+
// Start SMTP Service
if err := user.smtpService.Start(ctx, user.serviceGroup); err != nil {
return user, fmt.Errorf("failed to start smtp service: %w", err)
@@ -586,6 +597,9 @@ func (user *User) Logout(ctx context.Context, withAPI bool) error {
user.tasks.CancelAndWait()
+ // Close user observability service.
+ user.observabilityService.DeregisterUserClient(user.id)
+
// Stop Services
user.serviceGroup.CancelAndWait()
@@ -619,6 +633,9 @@ func (user *User) Close() {
// Stop any ongoing background tasks.
user.tasks.CancelAndWait()
+ // Close user observability service.
+ user.observabilityService.DeregisterUserClient(user.id)
+
// Stop Services
user.serviceGroup.CancelAndWait()
diff --git a/internal/user/user_test.go b/internal/user/user_test.go
index 1b27ea72..ecc8f0f5 100644
--- a/internal/user/user_test.go
+++ b/internal/user/user_test.go
@@ -29,6 +29,7 @@ import (
"github.com/ProtonMail/proton-bridge/v3/internal/certs"
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
+ "github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
"github.com/ProtonMail/proton-bridge/v3/internal/services/smtp"
"github.com/ProtonMail/proton-bridge/v3/internal/telemetry/mocks"
"github.com/ProtonMail/proton-bridge/v3/internal/vault"
@@ -164,6 +165,7 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma
nullSMTPServerManager,
nullEventSubscription,
nil,
+ observability.NewService(context.Background(), nil),
"",
true,
)