forked from Silverfish/proton-bridge
fix(GODT-3124): Race conditions reported by race check
This commit is contained in:
@ -19,15 +19,13 @@ package imapservice
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ProtonMail/proton-bridge/v3/internal/events"
|
"github.com/ProtonMail/proton-bridge/v3/internal/events"
|
||||||
)
|
)
|
||||||
|
|
||||||
type syncReporter struct {
|
type syncData struct {
|
||||||
userID string
|
|
||||||
eventPublisher events.EventPublisher
|
|
||||||
|
|
||||||
start time.Time
|
start time.Time
|
||||||
total int64
|
total int64
|
||||||
count int64
|
count int64
|
||||||
@ -36,8 +34,25 @@ type syncReporter struct {
|
|||||||
freq time.Duration
|
freq time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type syncReporter struct {
|
||||||
|
userID string
|
||||||
|
eventPublisher events.EventPublisher
|
||||||
|
|
||||||
|
dataLock sync.Mutex
|
||||||
|
data syncData
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rep *syncReporter) withData(f func(s *syncData)) {
|
||||||
|
rep.dataLock.Lock()
|
||||||
|
defer rep.dataLock.Unlock()
|
||||||
|
|
||||||
|
f(&rep.data)
|
||||||
|
}
|
||||||
|
|
||||||
func (rep *syncReporter) OnStart(ctx context.Context) {
|
func (rep *syncReporter) OnStart(ctx context.Context) {
|
||||||
rep.start = time.Now()
|
rep.withData(func(s *syncData) {
|
||||||
|
s.start = time.Now()
|
||||||
|
})
|
||||||
rep.eventPublisher.PublishEvent(ctx, events.SyncStarted{UserID: rep.userID})
|
rep.eventPublisher.PublishEvent(ctx, events.SyncStarted{UserID: rep.userID})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,35 +70,38 @@ func (rep *syncReporter) OnError(ctx context.Context, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rep *syncReporter) OnProgress(ctx context.Context, delta int64) {
|
func (rep *syncReporter) OnProgress(ctx context.Context, delta int64) {
|
||||||
rep.count += delta
|
rep.withData(func(s *syncData) {
|
||||||
|
s.count += delta
|
||||||
|
var progress float64
|
||||||
|
var remaining time.Duration
|
||||||
|
|
||||||
var progress float64
|
// It's possible for count to be bigger or smaller than total depending on when the sync begins and whether new
|
||||||
var remaining time.Duration
|
// messages are added/removed during this period. When this happens just limited the progress to 100%.
|
||||||
|
if s.count > s.total {
|
||||||
|
progress = 1
|
||||||
|
} else {
|
||||||
|
progress = float64(s.count) / float64(s.total)
|
||||||
|
remaining = time.Since(s.start) * time.Duration(s.total-(s.count+1)) / time.Duration(s.count+1)
|
||||||
|
}
|
||||||
|
|
||||||
// It's possible for count to be bigger or smaller than total depending on when the sync begins and whether new
|
if time.Since(s.last) > s.freq {
|
||||||
// messages are added/removed during this period. When this happens just limited the progress to 100%.
|
rep.eventPublisher.PublishEvent(ctx, events.SyncProgress{
|
||||||
if rep.count > rep.total {
|
UserID: rep.userID,
|
||||||
progress = 1
|
Progress: progress,
|
||||||
} else {
|
Elapsed: time.Since(s.start),
|
||||||
progress = float64(rep.count) / float64(rep.total)
|
Remaining: remaining,
|
||||||
remaining = time.Since(rep.start) * time.Duration(rep.total-(rep.count+1)) / time.Duration(rep.count+1)
|
})
|
||||||
}
|
|
||||||
|
|
||||||
if time.Since(rep.last) > rep.freq {
|
s.last = time.Now()
|
||||||
rep.eventPublisher.PublishEvent(ctx, events.SyncProgress{
|
}
|
||||||
UserID: rep.userID,
|
})
|
||||||
Progress: progress,
|
|
||||||
Elapsed: time.Since(rep.start),
|
|
||||||
Remaining: remaining,
|
|
||||||
})
|
|
||||||
|
|
||||||
rep.last = time.Now()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rep *syncReporter) InitializeProgressCounter(_ context.Context, current int64, total int64) {
|
func (rep *syncReporter) InitializeProgressCounter(_ context.Context, current int64, total int64) {
|
||||||
rep.count = current
|
rep.withData(func(s *syncData) {
|
||||||
rep.total = total
|
s.count = current
|
||||||
|
s.total = total
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSyncReporter(userID string, eventsPublisher events.EventPublisher, freq time.Duration) *syncReporter {
|
func newSyncReporter(userID string, eventsPublisher events.EventPublisher, freq time.Duration) *syncReporter {
|
||||||
@ -91,7 +109,9 @@ func newSyncReporter(userID string, eventsPublisher events.EventPublisher, freq
|
|||||||
userID: userID,
|
userID: userID,
|
||||||
eventPublisher: eventsPublisher,
|
eventPublisher: eventsPublisher,
|
||||||
|
|
||||||
start: time.Now(),
|
data: syncData{
|
||||||
freq: freq,
|
start: time.Now(),
|
||||||
|
freq: freq,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,6 +20,7 @@ package tests
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -29,6 +30,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type heartbeatRecorder struct {
|
type heartbeatRecorder struct {
|
||||||
|
lock sync.Mutex
|
||||||
heartbeat telemetry.HeartbeatData
|
heartbeat telemetry.HeartbeatData
|
||||||
bridge *bridge.Bridge
|
bridge *bridge.Bridge
|
||||||
reject bool
|
reject bool
|
||||||
@ -74,10 +76,19 @@ func (hb *heartbeatRecorder) SendHeartbeat(_ context.Context, metrics *telemetry
|
|||||||
if hb.reject {
|
if hb.reject {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
hb.lock.Lock()
|
||||||
|
defer hb.lock.Unlock()
|
||||||
hb.heartbeat = *metrics
|
hb.heartbeat = *metrics
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (hb *heartbeatRecorder) GetRecordedHeartbeat() telemetry.HeartbeatData {
|
||||||
|
hb.lock.Lock()
|
||||||
|
defer hb.lock.Unlock()
|
||||||
|
|
||||||
|
return hb.heartbeat
|
||||||
|
}
|
||||||
|
|
||||||
func (hb *heartbeatRecorder) SetLastHeartbeatSent(timestamp time.Time) error {
|
func (hb *heartbeatRecorder) SetLastHeartbeatSent(timestamp time.Time) error {
|
||||||
if hb.bridge == nil {
|
if hb.bridge == nil {
|
||||||
return errors.New("no bridge initialized")
|
return errors.New("no bridge initialized")
|
||||||
|
|||||||
@ -43,7 +43,7 @@ func (s *scenario) bridgeSendsTheFollowingHeartbeat(text *godog.DocString) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return matchHeartbeat(s.t.heartbeat.heartbeat, wantHeartbeat)
|
return matchHeartbeat(s.t.heartbeat.GetRecordedHeartbeat(), wantHeartbeat)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *scenario) bridgeNeedsToSendHeartbeat() error {
|
func (s *scenario) bridgeNeedsToSendHeartbeat() error {
|
||||||
|
|||||||
Reference in New Issue
Block a user