// Copyright (c) 2025 Proton AG // // This file is part of Proton Mail Bridge. // // Proton Mail Bridge is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Proton Mail Bridge is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with Proton Mail Bridge. If not, see . package observability import ( "context" "encoding/json" "os" "path/filepath" "sync" "time" "github.com/ProtonMail/gluon/async" "github.com/ProtonMail/go-proton-api" "github.com/ProtonMail/proton-bridge/v3/internal/locations" "github.com/ProtonMail/proton-bridge/v3/internal/services/telemetry" "github.com/bradenaw/juniper/xslices" "github.com/sirupsen/logrus" ) // Non-const for testing. var throttleDuration = 5 * time.Second //nolint:gochecknoglobals const ( maxStorageSize = 5000 maxBatchSize = 1000 filename = "metric_cache.json" ) type client struct { isTelemetryEnabled func(context.Context) bool sendMetrics func(context.Context, proton.ObservabilityBatch) error } // Sender - interface maps to the observability service methods, // so we can easily pass them down to relevant components. type Sender interface { AddMetrics(metrics ...proton.ObservabilityMetric) AddDistinctMetrics(errType DistinctionMetricTypeEnum, metrics ...proton.ObservabilityMetric) AddTimeLimitedMetric(metricType DistinctionMetricTypeEnum, metric proton.ObservabilityMetric) GetEmailClient() string } type BasicSender interface { AddMetrics(metric ...proton.ObservabilityMetric) } type Service struct { ctx context.Context cancel context.CancelFunc panicHandler async.PanicHandler cachePath string lastDispatch time.Time isDispatchScheduled bool wg sync.WaitGroup signalDataArrived chan struct{} signalDispatch chan struct{} log *logrus.Entry metricStore []proton.ObservabilityMetric metricStoreLock sync.Mutex userClientStore map[string]*client userClientStoreLock sync.Mutex distinctionUtility *distinctionUtility } func newService() *Service { return &Service{ ctx: context.Background(), metricStore: make([]proton.ObservabilityMetric, 0), log: logrus.WithFields(logrus.Fields{"pkg": "observability"}), userClientStore: make(map[string]*client), } } // NewTestService initializes a new basic observability service with the required struct fields. // Should only be used for testing. func NewTestService() *Service { return newService() } func WithObservability(locations *locations.Locations, fn func(service *Service) error) error { service := newService() cacheDir, err := locations.ProvideObservabilityMetricsCachePath() if err != nil { service.log.WithError(err).Warn("Could not obtain cache path") return fn(service) } cachePath := filepath.Clean(filepath.Join(cacheDir, filename)) service.cachePath = cachePath service.readCacheFile() defer service.writeCacheFile() return fn(service) } // Initialize sets up the observability Service. If not initialized, the service will remain inactive and emit no metrics. // Should exclusively be called during bridge set-up. func (s *Service) Initialize(ctx context.Context, panicHandler async.PanicHandler) { ctx, cancel := context.WithCancel(ctx) s.ctx = ctx s.cancel = cancel s.panicHandler = panicHandler s.lastDispatch = time.Now().Add(-throttleDuration) s.signalDataArrived = make(chan struct{}, 1) s.signalDispatch = make(chan struct{}, 1) s.distinctionUtility = newDistinctionUtility(ctx, panicHandler, s) } // Run starts the observability service goroutine. // The function also sets some utility functions to a helper struct aimed at differentiating the amount of users sending metric updates. func (s *Service) Run(settingsGetter settingsGetter) { if s.log != nil { s.log.Info("Starting service") } s.distinctionUtility.setSettingsGetter(settingsGetter) s.distinctionUtility.runHeartbeat() s.wg.Add(1) go func() { defer s.wg.Done() s.start() }() } // When new data is received, we determine if we can immediately send the request. // First, we check if a dispatch operation is already scheduled. If it is, we do nothing. // If no dispatch is scheduled, we verify if the required time interval has passed since the last send. // If the interval hasn't passed, we schedule the dispatch to occur when the threshold is met. // If the interval has passed, we initiate an immediate dispatch. func (s *Service) start() { defer async.HandlePanic(s.panicHandler) for { select { case <-s.ctx.Done(): return case <-s.signalDispatch: s.dispatchData() case <-s.signalDataArrived: if s.isDispatchScheduled { continue } if time.Since(s.lastDispatch) <= throttleDuration { s.scheduleDispatch() continue } s.sendSignal(s.signalDispatch) } } } func (s *Service) readCacheFile() { if s.cachePath == "" { return } file, err := os.Open(s.cachePath) if err != nil { s.log.WithError(err).Info("Unable to open cache file") return } defer func(file *os.File) { if err := file.Close(); err != nil { s.log.WithError(err).Error("Unable to close cache file after read") } }(file) s.withMetricStoreLock(func() { if err = json.NewDecoder(file).Decode(&s.metricStore); err != nil { s.log.WithError(err).Error("Unable to decode cache file") } // Since we omit marshalling the field, we need to explicitly overwrite it. for i := range s.metricStore { s.metricStore[i].ShouldCache = true } }) } func (s *Service) writeCacheFile() { if s.cachePath == "" { return } file, err := os.Create(s.cachePath) if err != nil { s.log.WithError(err).Warn("Unable to create cache file") } defer func(file *os.File) { if err := file.Close(); err != nil { s.log.WithError(err).Error("Unable to close cache file after write") } }(file) s.withMetricStoreLock(func() { metricsToCache := xslices.Filter(s.metricStore, func(m proton.ObservabilityMetric) bool { return m.ShouldCache }) if err = json.NewEncoder(file).Encode(metricsToCache); err != nil { s.log.WithError(err).Error("Unable to encode data to cache file") } }) } func (s *Service) dispatchData() { s.isDispatchScheduled = false // Only accessed via a single goroutine, so no mutexes. if !s.haveMetricsAndClients() { return } // Get a copy of the metrics we want to send and batch them accordingly var numberOfRemainingMetrics int var metricsToSend []proton.ObservabilityMetric s.withMetricStoreLock(func() { numberOfMetricsToSend := min(len(s.metricStore), maxBatchSize) metricsToSend = make([]proton.ObservabilityMetric, numberOfMetricsToSend) copy(metricsToSend, s.metricStore[:numberOfMetricsToSend]) s.metricStore = s.metricStore[numberOfMetricsToSend:] numberOfRemainingMetrics = len(s.metricStore) }) // Send them out to the endpoint telemetryEnabled := s.dispatchViaClient(&metricsToSend) // If there are more metric updates than the max batch limit and telemetry is enabled for one of the clients // then we immediately schedule another dispatch. if numberOfRemainingMetrics > 0 && telemetryEnabled { s.scheduleDispatch() } } // dispatchViaClient - return value tells us whether telemetry is enabled // such that we know whether to schedule another dispatch if more data is present. func (s *Service) dispatchViaClient(metricsToSend *[]proton.ObservabilityMetric) bool { s.log.Info("Sending observability data.") s.userClientStoreLock.Lock() defer s.userClientStoreLock.Unlock() for _, value := range s.userClientStore { if !value.isTelemetryEnabled(s.ctx) { continue } if err := value.sendMetrics(s.ctx, proton.ObservabilityBatch{Metrics: *metricsToSend}); err != nil { s.log.WithError(err).Error("Issue occurred when sending observability data.") } else { s.log.Info("Successfully sent observability data.") } s.lastDispatch = time.Now() return true } s.log.Info("Could not send observability data. Telemetry is not enabled.") return false } func (s *Service) scheduleDispatch() { waitTime := throttleDuration - time.Since(s.lastDispatch) if waitTime <= 0 { s.sendSignal(s.signalDispatch) return } s.log.Info("Scheduling observability data sending") s.isDispatchScheduled = true go func() { defer async.HandlePanic(s.panicHandler) select { case <-s.ctx.Done(): return case <-time.After(waitTime): s.sendSignal(s.signalDispatch) } }() } func (s *Service) addMetrics(metric ...proton.ObservabilityMetric) { s.withMetricStoreLock(func() { metricStoreLength := len(s.metricStore) if metricStoreLength >= maxStorageSize { s.log.Info("Max metric storage size has been exceeded. Dropping oldest metrics.") dropCount := metricStoreLength - maxStorageSize + 1 s.metricStore = s.metricStore[dropCount:] } s.metricStore = append(s.metricStore, metric...) }) // If the context has been cancelled i.e. the service has been stopped then we should be free to exit. if s.ctx.Err() != nil { return } s.sendSignal(s.signalDataArrived) } func (s *Service) flushMetricsTest() { s.withMetricStoreLock(func() { s.metricStore = make([]proton.ObservabilityMetric, 0) }) } // addMetricsIfClients - will append a metric only if there are authenticated clients // via which we can reach the endpoint. func (s *Service) addMetricsIfClients(metric ...proton.ObservabilityMetric) { hasClients := false s.withUserClientStoreLock(func() { hasClients = len(s.userClientStore) > 0 }) if !hasClients { return } s.addMetrics(metric...) } func (s *Service) RegisterUserClient(userID string, protonClient *proton.Client, telemetryService *telemetry.Service, userPlan string) { s.log.Info("Registering user client, ID:", userID) s.withUserClientStoreLock(func() { s.userClientStore[userID] = &client{ isTelemetryEnabled: telemetryService.IsTelemetryEnabled, sendMetrics: protonClient.SendObservabilityBatch, } }) s.distinctionUtility.setUserPlan(userPlan) // There may be a case where we already have metric updates stored, so try to flush; s.sendSignal(s.signalDataArrived) } func (s *Service) DeregisterUserClient(userID string) { s.log.Info("De-registering user client, ID:", userID) s.withUserClientStoreLock(func() { delete(s.userClientStore, userID) }) } func (s *Service) Stop() { s.log.Info("Stopping service") s.cancel() s.wg.Wait() close(s.signalDataArrived) close(s.signalDispatch) } // Utility functions below. func (s *Service) haveMetricsAndClients() bool { s.metricStoreLock.Lock() s.userClientStoreLock.Lock() defer s.metricStoreLock.Unlock() defer s.userClientStoreLock.Unlock() return len(s.metricStore) > 0 && len(s.userClientStore) > 0 } func (s *Service) withUserClientStoreLock(fn func()) { s.userClientStoreLock.Lock() defer s.userClientStoreLock.Unlock() fn() } func (s *Service) withMetricStoreLock(fn func()) { s.metricStoreLock.Lock() defer s.metricStoreLock.Unlock() fn() } // We use buffered channels; we shouldn't block them. func (s *Service) sendSignal(channel chan struct{}) { select { case channel <- struct{}{}: default: } } // ModifyThrottlePeriod - used for testing. func ModifyThrottlePeriod(duration time.Duration) { throttleDuration = duration } func (s *Service) AddMetrics(metrics ...proton.ObservabilityMetric) { s.addMetrics(metrics...) } // AddDistinctMetrics - sends an additional metric related to the user, so we can determine // what number of events come from what number of users. // As the binning interval is what allows us to do this we // should not send these if there are no logged-in users at that moment. func (s *Service) AddDistinctMetrics(errType DistinctionMetricTypeEnum, metrics ...proton.ObservabilityMetric) { metrics = s.distinctionUtility.generateDistinctMetrics(errType, metrics...) s.addMetricsIfClients(metrics...) } // AddTimeLimitedMetric - schedules a metric to be sent if a metric of the same type has not been sent within some interval. // The interval is defined in the distinction utility. func (s *Service) AddTimeLimitedMetric(metricType DistinctionMetricTypeEnum, metric proton.ObservabilityMetric) { if !s.distinctionUtility.checkAndUpdateLastSentMap(metricType) { return } s.addMetricsIfClients(metric) } func (s *Service) GetEmailClient() string { return s.distinctionUtility.getEmailClientUserAgent() } // ModifyHeartbeatInterval - should only be used for testing. Resets the heartbeat ticker. func (s *Service) ModifyHeartbeatInterval(duration time.Duration) { s.distinctionUtility.heartbeatTicker.Reset(duration) } func ModifyUserMetricInterval(duration time.Duration) { updateInterval = duration }