mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-10 04:36:43 +00:00
feat(BRIDGE-122): Observability service implementation
This commit is contained in:
2
go.mod
2
go.mod
@ -9,7 +9,7 @@ require (
|
|||||||
github.com/Masterminds/semver/v3 v3.2.0
|
github.com/Masterminds/semver/v3 v3.2.0
|
||||||
github.com/ProtonMail/gluon v0.17.1-0.20240514133734-79cdd0fec41c
|
github.com/ProtonMail/gluon v0.17.1-0.20240514133734-79cdd0fec41c
|
||||||
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a
|
github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a
|
||||||
github.com/ProtonMail/go-proton-api v0.4.1-0.20240821081056-dd607af0f917
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240827122236-ca6bb6449bba
|
||||||
github.com/ProtonMail/gopenpgp/v2 v2.7.4-proton
|
github.com/ProtonMail/gopenpgp/v2 v2.7.4-proton
|
||||||
github.com/PuerkitoBio/goquery v1.8.1
|
github.com/PuerkitoBio/goquery v1.8.1
|
||||||
github.com/abiosoft/ishell v2.0.0+incompatible
|
github.com/abiosoft/ishell v2.0.0+incompatible
|
||||||
|
|||||||
6
go.sum
6
go.sum
@ -46,6 +46,12 @@ github.com/ProtonMail/go-proton-api v0.4.1-0.20240819131705-149e50199c5b h1:zifG
|
|||||||
github.com/ProtonMail/go-proton-api v0.4.1-0.20240819131705-149e50199c5b/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240819131705-149e50199c5b/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
|
||||||
github.com/ProtonMail/go-proton-api v0.4.1-0.20240821081056-dd607af0f917 h1:Ma6PfXFDuw7rYYq28FXNW6ubhYquRUmBuLyZrjJWHUE=
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240821081056-dd607af0f917 h1:Ma6PfXFDuw7rYYq28FXNW6ubhYquRUmBuLyZrjJWHUE=
|
||||||
github.com/ProtonMail/go-proton-api v0.4.1-0.20240821081056-dd607af0f917/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240821081056-dd607af0f917/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
|
||||||
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240822150235-7a6190889179 h1:6Xo0iRYa4GBgZ2HA+IR3KdqiML8Z10h2F9TYe+9n1+M=
|
||||||
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240822150235-7a6190889179/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
|
||||||
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240827084449-71096377c391 h1:PW6bE+mhsfAx4+wDCCNjhFrCNiiuMjY6j7RwqRUdPKI=
|
||||||
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240827084449-71096377c391/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
|
||||||
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240827122236-ca6bb6449bba h1:QtDxgIbgPqRQg7VT+nIUJlaOyNFAoGyg59oW3Hji/0A=
|
||||||
|
github.com/ProtonMail/go-proton-api v0.4.1-0.20240827122236-ca6bb6449bba/go.mod h1:3A0cpdo0BIenIPjTG6u8EbzJ8uuJy7rVvM/NaynjCKA=
|
||||||
github.com/ProtonMail/go-smtp v0.0.0-20231109081432-2b3d50599865 h1:EP1gnxLL5Z7xBSymE9nSTM27nRYINuvssAtDmG0suD8=
|
github.com/ProtonMail/go-smtp v0.0.0-20231109081432-2b3d50599865 h1:EP1gnxLL5Z7xBSymE9nSTM27nRYINuvssAtDmG0suD8=
|
||||||
github.com/ProtonMail/go-smtp v0.0.0-20231109081432-2b3d50599865/go.mod h1:qm27SGYgoIPRot6ubfQ/GpiPy/g3PaZAVRxiO/sDUgQ=
|
github.com/ProtonMail/go-smtp v0.0.0-20231109081432-2b3d50599865/go.mod h1:qm27SGYgoIPRot6ubfQ/GpiPy/g3PaZAVRxiO/sDUgQ=
|
||||||
github.com/ProtonMail/go-srp v0.0.7 h1:Sos3Qk+th4tQR64vsxGIxYpN3rdnG9Wf9K4ZloC1JrI=
|
github.com/ProtonMail/go-srp v0.0.7 h1:Sos3Qk+th4tQR64vsxGIxYpN3rdnG9Wf9K4ZloC1JrI=
|
||||||
|
|||||||
@ -45,6 +45,7 @@ import (
|
|||||||
"github.com/ProtonMail/proton-bridge/v3/internal/safe"
|
"github.com/ProtonMail/proton-bridge/v3/internal/safe"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/sentry"
|
"github.com/ProtonMail/proton-bridge/v3/internal/sentry"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapsmtpserver"
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapsmtpserver"
|
||||||
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/syncservice"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/telemetry"
|
"github.com/ProtonMail/proton-bridge/v3/internal/telemetry"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/unleash"
|
"github.com/ProtonMail/proton-bridge/v3/internal/unleash"
|
||||||
@ -137,8 +138,12 @@ type Bridge struct {
|
|||||||
|
|
||||||
serverManager *imapsmtpserver.Service
|
serverManager *imapsmtpserver.Service
|
||||||
syncService *syncservice.Service
|
syncService *syncservice.Service
|
||||||
|
|
||||||
// unleashService is responsible for polling the feature flags and caching
|
// unleashService is responsible for polling the feature flags and caching
|
||||||
unleashService *unleash.Service
|
unleashService *unleash.Service
|
||||||
|
|
||||||
|
// observabilityService is responsible for handling calls to the observability system
|
||||||
|
observabilityService *observability.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
var logPkg = logrus.WithField("pkg", "bridge") //nolint:gochecknoglobals
|
var logPkg = logrus.WithField("pkg", "bridge") //nolint:gochecknoglobals
|
||||||
@ -300,6 +305,8 @@ func newBridge(
|
|||||||
syncService: syncservice.NewService(reporter, panicHandler),
|
syncService: syncservice.NewService(reporter, panicHandler),
|
||||||
|
|
||||||
unleashService: unleashService,
|
unleashService: unleashService,
|
||||||
|
|
||||||
|
observabilityService: observability.NewService(ctx, panicHandler),
|
||||||
}
|
}
|
||||||
|
|
||||||
bridge.serverManager = imapsmtpserver.NewService(context.Background(),
|
bridge.serverManager = imapsmtpserver.NewService(context.Background(),
|
||||||
@ -329,6 +336,8 @@ func newBridge(
|
|||||||
|
|
||||||
bridge.unleashService.Run()
|
bridge.unleashService.Run()
|
||||||
|
|
||||||
|
bridge.observabilityService.Run()
|
||||||
|
|
||||||
return bridge, nil
|
return bridge, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -456,6 +465,9 @@ func (bridge *Bridge) GetErrors() []error {
|
|||||||
func (bridge *Bridge) Close(ctx context.Context) {
|
func (bridge *Bridge) Close(ctx context.Context) {
|
||||||
logPkg.Info("Closing bridge")
|
logPkg.Info("Closing bridge")
|
||||||
|
|
||||||
|
// Stop observability service
|
||||||
|
bridge.observabilityService.Stop()
|
||||||
|
|
||||||
// Stop heart beat before closing users.
|
// Stop heart beat before closing users.
|
||||||
bridge.heartbeat.stop()
|
bridge.heartbeat.stop()
|
||||||
|
|
||||||
@ -690,3 +702,7 @@ func GetUpdatedCachePath(gluonDBPath, gluonCachePath string) string {
|
|||||||
func (bridge *Bridge) GetFeatureFlagValue(key string) bool {
|
func (bridge *Bridge) GetFeatureFlagValue(key string) bool {
|
||||||
return bridge.unleashService.GetFlagValue(key)
|
return bridge.unleashService.GetFlagValue(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bridge *Bridge) PushObservabilityMetric(metric proton.ObservabilityMetric) {
|
||||||
|
bridge.observabilityService.AddMetric(metric)
|
||||||
|
}
|
||||||
|
|||||||
97
internal/bridge/observability_test.go
Normal file
97
internal/bridge/observability_test.go
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
// Copyright (c) 2024 Proton AG
|
||||||
|
//
|
||||||
|
// This file is part of Proton Mail Bridge.
|
||||||
|
//
|
||||||
|
// Proton Mail Bridge is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// Proton Mail Bridge is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package bridge_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ProtonMail/go-proton-api"
|
||||||
|
"github.com/ProtonMail/go-proton-api/server"
|
||||||
|
"github.com/ProtonMail/proton-bridge/v3/internal/bridge"
|
||||||
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBridge_Observability(t *testing.T) {
|
||||||
|
testMetric := proton.ObservabilityMetric{
|
||||||
|
Name: "test1",
|
||||||
|
Version: 1,
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
Data: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
withEnv(t, func(ctx context.Context, s *server.Server, netCtl *proton.NetCtl, locator bridge.Locator, vaultKey []byte) {
|
||||||
|
throttlePeriod := time.Millisecond * 500
|
||||||
|
observability.ModifyThrottlePeriod(throttlePeriod)
|
||||||
|
|
||||||
|
withBridge(ctx, t, s.GetHostURL(), netCtl, locator, vaultKey, func(bridge *bridge.Bridge, _ *bridge.Mocks) {
|
||||||
|
require.NoError(t, getErr(bridge.LoginFull(ctx, username, password, nil, nil)))
|
||||||
|
|
||||||
|
bridge.PushObservabilityMetric(testMetric)
|
||||||
|
time.Sleep(time.Millisecond * 50) // Wait for the metric to be sent
|
||||||
|
require.Equal(t, 1, len(s.GetObservabilityStatistics().Metrics))
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
time.Sleep(time.Millisecond * 5) // Minor delay between each so our tests aren't flaky
|
||||||
|
bridge.PushObservabilityMetric(testMetric)
|
||||||
|
}
|
||||||
|
// We should still have only 1 metric sent as the throttleDuration has not passed
|
||||||
|
require.Equal(t, 1, len(s.GetObservabilityStatistics().Metrics))
|
||||||
|
|
||||||
|
// Wait for throttle duration to pass; we should have our remaining metrics posted
|
||||||
|
time.Sleep(throttlePeriod)
|
||||||
|
require.Equal(t, 11, len(s.GetObservabilityStatistics().Metrics))
|
||||||
|
|
||||||
|
// Wait for the throttle duration to reset; i.e. so we have enough time to send a request immediately
|
||||||
|
time.Sleep(throttlePeriod)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
time.Sleep(time.Millisecond * 5)
|
||||||
|
bridge.PushObservabilityMetric(testMetric)
|
||||||
|
}
|
||||||
|
// We should only have one additional metric sent immediately
|
||||||
|
require.Equal(t, 12, len(s.GetObservabilityStatistics().Metrics))
|
||||||
|
|
||||||
|
// Wait for the others to be sent
|
||||||
|
time.Sleep(throttlePeriod)
|
||||||
|
require.Equal(t, 21, len(s.GetObservabilityStatistics().Metrics))
|
||||||
|
|
||||||
|
// Spam the endpoint a bit
|
||||||
|
for i := 0; i < 300; i++ {
|
||||||
|
if i < 200 {
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
}
|
||||||
|
bridge.PushObservabilityMetric(testMetric)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure we've sent all metrics
|
||||||
|
time.Sleep(throttlePeriod)
|
||||||
|
|
||||||
|
observabilityStats := s.GetObservabilityStatistics()
|
||||||
|
require.Equal(t, 321, len(observabilityStats.Metrics))
|
||||||
|
|
||||||
|
// Verify that each request had a throttleDuration time difference between each request
|
||||||
|
for i := 0; i < len(observabilityStats.RequestTime)-1; i++ {
|
||||||
|
tOne := observabilityStats.RequestTime[i]
|
||||||
|
tTwo := observabilityStats.RequestTime[i+1]
|
||||||
|
require.True(t, tTwo.Sub(tOne).Abs() > throttlePeriod)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
@ -566,6 +566,7 @@ func (bridge *Bridge) addUserWithVault(
|
|||||||
bridge.serverManager,
|
bridge.serverManager,
|
||||||
&bridgeEventSubscription{b: bridge},
|
&bridgeEventSubscription{b: bridge},
|
||||||
bridge.syncService,
|
bridge.syncService,
|
||||||
|
bridge.observabilityService,
|
||||||
syncSettingsPath,
|
syncSettingsPath,
|
||||||
isNew,
|
isNew,
|
||||||
)
|
)
|
||||||
|
|||||||
279
internal/services/observability/service.go
Normal file
279
internal/services/observability/service.go
Normal file
@ -0,0 +1,279 @@
|
|||||||
|
// Copyright (c) 2024 Proton AG
|
||||||
|
//
|
||||||
|
// This file is part of Proton Mail Bridge.
|
||||||
|
//
|
||||||
|
// Proton Mail Bridge is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// Proton Mail Bridge is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package observability
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ProtonMail/gluon/async"
|
||||||
|
"github.com/ProtonMail/go-proton-api"
|
||||||
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/telemetry"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Non-const for testing.
|
||||||
|
var throttleDuration = 5 * time.Second //nolint:gochecknoglobals
|
||||||
|
|
||||||
|
const (
|
||||||
|
maxStorageSize = 5000
|
||||||
|
maxBatchSize = 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
type client struct {
|
||||||
|
isTelemetryEnabled func(context.Context) bool
|
||||||
|
sendMetrics func(context.Context, proton.ObservabilityBatch) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type Service struct {
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
|
||||||
|
panicHandler async.PanicHandler
|
||||||
|
|
||||||
|
lastDispatch time.Time
|
||||||
|
isDispatchScheduled bool
|
||||||
|
|
||||||
|
signalDataArrived chan struct{}
|
||||||
|
signalDispatch chan struct{}
|
||||||
|
|
||||||
|
log *logrus.Entry
|
||||||
|
|
||||||
|
metricStore []proton.ObservabilityMetric
|
||||||
|
metricStoreLock sync.Mutex
|
||||||
|
|
||||||
|
userClientStore map[string]*client
|
||||||
|
userClientStoreLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewService(ctx context.Context, panicHandler async.PanicHandler) *Service {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
service := &Service{
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
|
||||||
|
panicHandler: panicHandler,
|
||||||
|
|
||||||
|
lastDispatch: time.Now().Add(-throttleDuration),
|
||||||
|
|
||||||
|
signalDataArrived: make(chan struct{}, 1),
|
||||||
|
signalDispatch: make(chan struct{}, 1),
|
||||||
|
|
||||||
|
log: logrus.WithFields(logrus.Fields{"pkg": "observability"}),
|
||||||
|
|
||||||
|
metricStore: make([]proton.ObservabilityMetric, 0),
|
||||||
|
|
||||||
|
userClientStore: make(map[string]*client),
|
||||||
|
}
|
||||||
|
|
||||||
|
return service
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) Run() {
|
||||||
|
s.log.Info("Starting service")
|
||||||
|
go func() {
|
||||||
|
s.start()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// When new data is received, we determine if we can immediately send the request.
|
||||||
|
// First, we check if a dispatch operation is already scheduled. If it is, we do nothing.
|
||||||
|
// If no dispatch is scheduled, we verify if the required time interval has passed since the last send.
|
||||||
|
// If the interval hasn't passed, we schedule the dispatch to occur when the threshold is met.
|
||||||
|
// If the interval has passed, we initiate an immediate dispatch.
|
||||||
|
func (s *Service) start() {
|
||||||
|
defer async.HandlePanic(s.panicHandler)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-s.signalDispatch:
|
||||||
|
s.dispatchData()
|
||||||
|
|
||||||
|
case <-s.signalDataArrived:
|
||||||
|
if s.isDispatchScheduled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if time.Since(s.lastDispatch) <= throttleDuration {
|
||||||
|
s.scheduleDispatch()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s.sendSignal(s.signalDispatch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) dispatchData() {
|
||||||
|
s.isDispatchScheduled = false // Only accessed via a single goroutine, so no mutexes.
|
||||||
|
if !s.haveMetricsAndClients() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a copy of the metrics we want to send and batch them accordingly
|
||||||
|
var numberOfRemainingMetrics int
|
||||||
|
var metricsToSend []proton.ObservabilityMetric
|
||||||
|
|
||||||
|
s.withMetricStoreLock(func() {
|
||||||
|
numberOfMetricsToSend := min(len(s.metricStore), maxBatchSize)
|
||||||
|
metricsToSend = make([]proton.ObservabilityMetric, numberOfMetricsToSend)
|
||||||
|
copy(metricsToSend, s.metricStore[:numberOfMetricsToSend])
|
||||||
|
s.metricStore = s.metricStore[numberOfMetricsToSend:]
|
||||||
|
numberOfRemainingMetrics = len(s.metricStore)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Send them out to the endpoint
|
||||||
|
telemetryEnabled := s.dispatchViaClient(&metricsToSend)
|
||||||
|
|
||||||
|
// If there are more metric updates than the max batch limit and telemetry is enabled for one of the clients
|
||||||
|
// then we immediately schedule another dispatch.
|
||||||
|
if numberOfRemainingMetrics > 0 && telemetryEnabled {
|
||||||
|
s.scheduleDispatch()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// dispatchViaClient - return value tells us whether telemetry is enabled
|
||||||
|
// such that we know whether to schedule another dispatch if more data is present.
|
||||||
|
func (s *Service) dispatchViaClient(metricsToSend *[]proton.ObservabilityMetric) bool {
|
||||||
|
s.log.Info("Sending observability data.")
|
||||||
|
s.userClientStoreLock.Lock()
|
||||||
|
defer s.userClientStoreLock.Unlock()
|
||||||
|
|
||||||
|
for _, value := range s.userClientStore {
|
||||||
|
if !value.isTelemetryEnabled(s.ctx) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := value.sendMetrics(s.ctx, proton.ObservabilityBatch{Metrics: *metricsToSend}); err != nil {
|
||||||
|
s.log.WithError(err).Error("Issue occurred when sending observability data.")
|
||||||
|
} else {
|
||||||
|
s.log.Info("Successfully sent observability data.")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.lastDispatch = time.Now()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
s.log.Info("Could not send observability data. Telemetry is not enabled.")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) scheduleDispatch() {
|
||||||
|
waitTime := throttleDuration - time.Since(s.lastDispatch)
|
||||||
|
if waitTime <= 0 {
|
||||||
|
s.sendSignal(s.signalDispatch)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.log.Info("Scheduling observability data sending")
|
||||||
|
|
||||||
|
s.isDispatchScheduled = true
|
||||||
|
go func() {
|
||||||
|
defer async.HandlePanic(s.panicHandler)
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(waitTime):
|
||||||
|
s.sendSignal(s.signalDispatch)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) AddMetric(metric proton.ObservabilityMetric) {
|
||||||
|
s.withMetricStoreLock(func() {
|
||||||
|
metricStoreLength := len(s.metricStore)
|
||||||
|
if metricStoreLength >= maxStorageSize {
|
||||||
|
s.log.Info("Max metric storage size has been exceeded. Dropping oldest metrics.")
|
||||||
|
|
||||||
|
dropCount := metricStoreLength - maxStorageSize + 1
|
||||||
|
s.metricStore = s.metricStore[dropCount:]
|
||||||
|
}
|
||||||
|
s.metricStore = append(s.metricStore, metric)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.sendSignal(s.signalDataArrived)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) RegisterUserClient(userID string, protonClient *proton.Client, telemetryService *telemetry.Service) {
|
||||||
|
s.log.Info("Registering user client, ID:", userID)
|
||||||
|
|
||||||
|
s.withUserClientStoreLock(func() {
|
||||||
|
s.userClientStore[userID] = &client{
|
||||||
|
isTelemetryEnabled: telemetryService.IsTelemetryEnabled,
|
||||||
|
sendMetrics: protonClient.SendObservabilityBatch,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// There may be a case where we already have metric updates stored, so try to flush;
|
||||||
|
s.sendSignal(s.signalDataArrived)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) DeregisterUserClient(userID string) {
|
||||||
|
s.log.Info("De-registering user client, ID:", userID)
|
||||||
|
|
||||||
|
s.withUserClientStoreLock(func() {
|
||||||
|
delete(s.userClientStore, userID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) Stop() {
|
||||||
|
s.log.Info("Stopping service")
|
||||||
|
|
||||||
|
s.cancel()
|
||||||
|
close(s.signalDataArrived)
|
||||||
|
close(s.signalDispatch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Utility functions below.
|
||||||
|
func (s *Service) haveMetricsAndClients() bool {
|
||||||
|
s.metricStoreLock.Lock()
|
||||||
|
s.userClientStoreLock.Lock()
|
||||||
|
defer s.metricStoreLock.Unlock()
|
||||||
|
defer s.userClientStoreLock.Unlock()
|
||||||
|
|
||||||
|
return len(s.metricStore) > 0 && len(s.userClientStore) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) withUserClientStoreLock(fn func()) {
|
||||||
|
s.userClientStoreLock.Lock()
|
||||||
|
defer s.userClientStoreLock.Unlock()
|
||||||
|
fn()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) withMetricStoreLock(fn func()) {
|
||||||
|
s.metricStoreLock.Lock()
|
||||||
|
defer s.metricStoreLock.Unlock()
|
||||||
|
fn()
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use buffered channels; we shouldn't block them.
|
||||||
|
func (s *Service) sendSignal(channel chan struct{}) {
|
||||||
|
select {
|
||||||
|
case channel <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModifyThrottlePeriod - used for testing.
|
||||||
|
func ModifyThrottlePeriod(duration time.Duration) {
|
||||||
|
throttleDuration = duration
|
||||||
|
}
|
||||||
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/ProtonMail/proton-bridge/v3/internal/events"
|
"github.com/ProtonMail/proton-bridge/v3/internal/events"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/safe"
|
"github.com/ProtonMail/proton-bridge/v3/internal/safe"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
|
||||||
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks"
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/orderedtasks"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder"
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/sendrecorder"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/smtp"
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/smtp"
|
||||||
@ -86,6 +87,8 @@ type User struct {
|
|||||||
imapService *imapservice.Service
|
imapService *imapservice.Service
|
||||||
telemetryService *telemetryservice.Service
|
telemetryService *telemetryservice.Service
|
||||||
|
|
||||||
|
observabilityService *observability.Service
|
||||||
|
|
||||||
serviceGroup *orderedtasks.OrderedCancelGroup
|
serviceGroup *orderedtasks.OrderedCancelGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -104,6 +107,7 @@ func New(
|
|||||||
smtpServerManager smtp.ServerManager,
|
smtpServerManager smtp.ServerManager,
|
||||||
eventSubscription events.Subscription,
|
eventSubscription events.Subscription,
|
||||||
syncService syncservice.Regulator,
|
syncService syncservice.Regulator,
|
||||||
|
observabilityService *observability.Service,
|
||||||
syncConfigDir string,
|
syncConfigDir string,
|
||||||
isNew bool,
|
isNew bool,
|
||||||
) (*User, error) {
|
) (*User, error) {
|
||||||
@ -122,6 +126,7 @@ func New(
|
|||||||
smtpServerManager,
|
smtpServerManager,
|
||||||
eventSubscription,
|
eventSubscription,
|
||||||
syncService,
|
syncService,
|
||||||
|
observabilityService,
|
||||||
syncConfigDir,
|
syncConfigDir,
|
||||||
isNew,
|
isNew,
|
||||||
)
|
)
|
||||||
@ -153,6 +158,7 @@ func newImpl(
|
|||||||
smtpServerManager smtp.ServerManager,
|
smtpServerManager smtp.ServerManager,
|
||||||
eventSubscription events.Subscription,
|
eventSubscription events.Subscription,
|
||||||
syncService syncservice.Regulator,
|
syncService syncservice.Regulator,
|
||||||
|
observabilityService *observability.Service,
|
||||||
syncConfigDir string,
|
syncConfigDir string,
|
||||||
isNew bool,
|
isNew bool,
|
||||||
) (*User, error) {
|
) (*User, error) {
|
||||||
@ -215,6 +221,8 @@ func newImpl(
|
|||||||
|
|
||||||
serviceGroup: orderedtasks.NewOrderedCancelGroup(crashHandler),
|
serviceGroup: orderedtasks.NewOrderedCancelGroup(crashHandler),
|
||||||
smtpService: nil,
|
smtpService: nil,
|
||||||
|
|
||||||
|
observabilityService: observabilityService,
|
||||||
}
|
}
|
||||||
|
|
||||||
user.eventService = userevents.NewService(
|
user.eventService = userevents.NewService(
|
||||||
@ -318,6 +326,9 @@ func newImpl(
|
|||||||
// Start Identity Service
|
// Start Identity Service
|
||||||
user.identityService.Start(ctx, user.serviceGroup)
|
user.identityService.Start(ctx, user.serviceGroup)
|
||||||
|
|
||||||
|
// Add user client to observability service
|
||||||
|
observabilityService.RegisterUserClient(user.id, client, user.telemetryService)
|
||||||
|
|
||||||
// Start SMTP Service
|
// Start SMTP Service
|
||||||
if err := user.smtpService.Start(ctx, user.serviceGroup); err != nil {
|
if err := user.smtpService.Start(ctx, user.serviceGroup); err != nil {
|
||||||
return user, fmt.Errorf("failed to start smtp service: %w", err)
|
return user, fmt.Errorf("failed to start smtp service: %w", err)
|
||||||
@ -586,6 +597,9 @@ func (user *User) Logout(ctx context.Context, withAPI bool) error {
|
|||||||
|
|
||||||
user.tasks.CancelAndWait()
|
user.tasks.CancelAndWait()
|
||||||
|
|
||||||
|
// Close user observability service.
|
||||||
|
user.observabilityService.DeregisterUserClient(user.id)
|
||||||
|
|
||||||
// Stop Services
|
// Stop Services
|
||||||
user.serviceGroup.CancelAndWait()
|
user.serviceGroup.CancelAndWait()
|
||||||
|
|
||||||
@ -619,6 +633,9 @@ func (user *User) Close() {
|
|||||||
// Stop any ongoing background tasks.
|
// Stop any ongoing background tasks.
|
||||||
user.tasks.CancelAndWait()
|
user.tasks.CancelAndWait()
|
||||||
|
|
||||||
|
// Close user observability service.
|
||||||
|
user.observabilityService.DeregisterUserClient(user.id)
|
||||||
|
|
||||||
// Stop Services
|
// Stop Services
|
||||||
user.serviceGroup.CancelAndWait()
|
user.serviceGroup.CancelAndWait()
|
||||||
|
|
||||||
|
|||||||
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/ProtonMail/proton-bridge/v3/internal/certs"
|
"github.com/ProtonMail/proton-bridge/v3/internal/certs"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/events"
|
"github.com/ProtonMail/proton-bridge/v3/internal/events"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/imapservice"
|
||||||
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/observability"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/services/smtp"
|
"github.com/ProtonMail/proton-bridge/v3/internal/services/smtp"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/telemetry/mocks"
|
"github.com/ProtonMail/proton-bridge/v3/internal/telemetry/mocks"
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/vault"
|
"github.com/ProtonMail/proton-bridge/v3/internal/vault"
|
||||||
@ -164,6 +165,7 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma
|
|||||||
nullSMTPServerManager,
|
nullSMTPServerManager,
|
||||||
nullEventSubscription,
|
nullEventSubscription,
|
||||||
nil,
|
nil,
|
||||||
|
observability.NewService(context.Background(), nil),
|
||||||
"",
|
"",
|
||||||
true,
|
true,
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user