Files
proton-bridge/internal/services/observability/service.go

357 lines
10 KiB
Go

// Copyright (c) 2025 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package observability
import (
"context"
"sync"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/proton-bridge/v3/internal/services/telemetry"
"github.com/sirupsen/logrus"
)
// Non-const for testing.
var throttleDuration = 5 * time.Second //nolint:gochecknoglobals
const (
maxStorageSize = 5000
maxBatchSize = 1000
)
type client struct {
isTelemetryEnabled func(context.Context) bool
sendMetrics func(context.Context, proton.ObservabilityBatch) error
}
// Sender - interface maps to the observability service methods,
// so we can easily pass them down to relevant components.
type Sender interface {
AddMetrics(metrics ...proton.ObservabilityMetric)
AddDistinctMetrics(errType DistinctionMetricTypeEnum, metrics ...proton.ObservabilityMetric)
AddTimeLimitedMetric(metricType DistinctionMetricTypeEnum, metric proton.ObservabilityMetric)
GetEmailClient() string
}
type Service struct {
ctx context.Context
cancel context.CancelFunc
panicHandler async.PanicHandler
lastDispatch time.Time
isDispatchScheduled bool
signalDataArrived chan struct{}
signalDispatch chan struct{}
log *logrus.Entry
metricStore []proton.ObservabilityMetric
metricStoreLock sync.Mutex
userClientStore map[string]*client
userClientStoreLock sync.Mutex
distinctionUtility *distinctionUtility
}
func NewService(ctx context.Context, panicHandler async.PanicHandler) *Service {
ctx, cancel := context.WithCancel(ctx)
service := &Service{
ctx: ctx,
cancel: cancel,
panicHandler: panicHandler,
lastDispatch: time.Now().Add(-throttleDuration),
signalDataArrived: make(chan struct{}, 1),
signalDispatch: make(chan struct{}, 1),
log: logrus.WithFields(logrus.Fields{"pkg": "observability"}),
metricStore: make([]proton.ObservabilityMetric, 0),
userClientStore: make(map[string]*client),
}
service.distinctionUtility = newDistinctionUtility(ctx, panicHandler, service)
return service
}
// Run starts the observability service goroutine.
// The function also sets some utility functions to a helper struct aimed at differentiating the amount of users sending metric updates.
func (s *Service) Run(settingsGetter settingsGetter) {
s.log.Info("Starting service")
s.distinctionUtility.setSettingsGetter(settingsGetter)
s.distinctionUtility.runHeartbeat()
go func() {
s.start()
}()
}
// When new data is received, we determine if we can immediately send the request.
// First, we check if a dispatch operation is already scheduled. If it is, we do nothing.
// If no dispatch is scheduled, we verify if the required time interval has passed since the last send.
// If the interval hasn't passed, we schedule the dispatch to occur when the threshold is met.
// If the interval has passed, we initiate an immediate dispatch.
func (s *Service) start() {
defer async.HandlePanic(s.panicHandler)
for {
select {
case <-s.ctx.Done():
return
case <-s.signalDispatch:
s.dispatchData()
case <-s.signalDataArrived:
if s.isDispatchScheduled {
continue
}
if time.Since(s.lastDispatch) <= throttleDuration {
s.scheduleDispatch()
continue
}
s.sendSignal(s.signalDispatch)
}
}
}
func (s *Service) dispatchData() {
s.isDispatchScheduled = false // Only accessed via a single goroutine, so no mutexes.
if !s.haveMetricsAndClients() {
return
}
// Get a copy of the metrics we want to send and batch them accordingly
var numberOfRemainingMetrics int
var metricsToSend []proton.ObservabilityMetric
s.withMetricStoreLock(func() {
numberOfMetricsToSend := min(len(s.metricStore), maxBatchSize)
metricsToSend = make([]proton.ObservabilityMetric, numberOfMetricsToSend)
copy(metricsToSend, s.metricStore[:numberOfMetricsToSend])
s.metricStore = s.metricStore[numberOfMetricsToSend:]
numberOfRemainingMetrics = len(s.metricStore)
})
// Send them out to the endpoint
telemetryEnabled := s.dispatchViaClient(&metricsToSend)
// If there are more metric updates than the max batch limit and telemetry is enabled for one of the clients
// then we immediately schedule another dispatch.
if numberOfRemainingMetrics > 0 && telemetryEnabled {
s.scheduleDispatch()
}
}
// dispatchViaClient - return value tells us whether telemetry is enabled
// such that we know whether to schedule another dispatch if more data is present.
func (s *Service) dispatchViaClient(metricsToSend *[]proton.ObservabilityMetric) bool {
s.log.Info("Sending observability data.")
s.userClientStoreLock.Lock()
defer s.userClientStoreLock.Unlock()
for _, value := range s.userClientStore {
if !value.isTelemetryEnabled(s.ctx) {
continue
}
if err := value.sendMetrics(s.ctx, proton.ObservabilityBatch{Metrics: *metricsToSend}); err != nil {
s.log.WithError(err).Error("Issue occurred when sending observability data.")
} else {
s.log.Info("Successfully sent observability data.")
}
s.lastDispatch = time.Now()
return true
}
s.log.Info("Could not send observability data. Telemetry is not enabled.")
return false
}
func (s *Service) scheduleDispatch() {
waitTime := throttleDuration - time.Since(s.lastDispatch)
if waitTime <= 0 {
s.sendSignal(s.signalDispatch)
return
}
s.log.Info("Scheduling observability data sending")
s.isDispatchScheduled = true
go func() {
defer async.HandlePanic(s.panicHandler)
select {
case <-s.ctx.Done():
return
case <-time.After(waitTime):
s.sendSignal(s.signalDispatch)
}
}()
}
func (s *Service) addMetrics(metric ...proton.ObservabilityMetric) {
s.withMetricStoreLock(func() {
metricStoreLength := len(s.metricStore)
if metricStoreLength >= maxStorageSize {
s.log.Info("Max metric storage size has been exceeded. Dropping oldest metrics.")
dropCount := metricStoreLength - maxStorageSize + 1
s.metricStore = s.metricStore[dropCount:]
}
s.metricStore = append(s.metricStore, metric...)
})
// If the context has been cancelled i.e. the service has been stopped then we should be free to exit.
if s.ctx.Err() != nil {
return
}
s.sendSignal(s.signalDataArrived)
}
// addMetricsIfClients - will append a metric only if there are authenticated clients
// via which we can reach the endpoint.
func (s *Service) addMetricsIfClients(metric ...proton.ObservabilityMetric) {
hasClients := false
s.withUserClientStoreLock(func() {
hasClients = len(s.userClientStore) > 0
})
if !hasClients {
return
}
s.addMetrics(metric...)
}
func (s *Service) RegisterUserClient(userID string, protonClient *proton.Client, telemetryService *telemetry.Service, userPlan string) {
s.log.Info("Registering user client, ID:", userID)
s.withUserClientStoreLock(func() {
s.userClientStore[userID] = &client{
isTelemetryEnabled: telemetryService.IsTelemetryEnabled,
sendMetrics: protonClient.SendObservabilityBatch,
}
})
s.distinctionUtility.setUserPlan(userPlan)
// There may be a case where we already have metric updates stored, so try to flush;
s.sendSignal(s.signalDataArrived)
}
func (s *Service) DeregisterUserClient(userID string) {
s.log.Info("De-registering user client, ID:", userID)
s.withUserClientStoreLock(func() {
delete(s.userClientStore, userID)
})
}
func (s *Service) Stop() {
s.log.Info("Stopping service")
s.cancel()
close(s.signalDataArrived)
close(s.signalDispatch)
}
// Utility functions below.
func (s *Service) haveMetricsAndClients() bool {
s.metricStoreLock.Lock()
s.userClientStoreLock.Lock()
defer s.metricStoreLock.Unlock()
defer s.userClientStoreLock.Unlock()
return len(s.metricStore) > 0 && len(s.userClientStore) > 0
}
func (s *Service) withUserClientStoreLock(fn func()) {
s.userClientStoreLock.Lock()
defer s.userClientStoreLock.Unlock()
fn()
}
func (s *Service) withMetricStoreLock(fn func()) {
s.metricStoreLock.Lock()
defer s.metricStoreLock.Unlock()
fn()
}
// We use buffered channels; we shouldn't block them.
func (s *Service) sendSignal(channel chan struct{}) {
select {
case channel <- struct{}{}:
default:
}
}
// ModifyThrottlePeriod - used for testing.
func ModifyThrottlePeriod(duration time.Duration) {
throttleDuration = duration
}
func (s *Service) AddMetrics(metrics ...proton.ObservabilityMetric) {
s.addMetrics(metrics...)
}
// AddDistinctMetrics - sends an additional metric related to the user, so we can determine
// what number of events come from what number of users.
// As the binning interval is what allows us to do this we
// should not send these if there are no logged-in users at that moment.
func (s *Service) AddDistinctMetrics(errType DistinctionMetricTypeEnum, metrics ...proton.ObservabilityMetric) {
metrics = s.distinctionUtility.generateDistinctMetrics(errType, metrics...)
s.addMetricsIfClients(metrics...)
}
// AddTimeLimitedMetric - schedules a metric to be sent if a metric of the same type has not been sent within some interval.
// The interval is defined in the distinction utility.
func (s *Service) AddTimeLimitedMetric(metricType DistinctionMetricTypeEnum, metric proton.ObservabilityMetric) {
if !s.distinctionUtility.checkAndUpdateLastSentMap(metricType) {
return
}
s.addMetricsIfClients(metric)
}
func (s *Service) GetEmailClient() string {
return s.distinctionUtility.getEmailClientUserAgent()
}
// ModifyHeartbeatInterval - should only be used for testing. Resets the heartbeat ticker.
func (s *Service) ModifyHeartbeatInterval(duration time.Duration) {
s.distinctionUtility.heartbeatTicker.Reset(duration)
}
func ModifyUserMetricInterval(duration time.Duration) {
updateInterval = duration
}