GODT-2002: Wait for API events to be applied after send
When we send a message, the send recorder records the sent message. When the client then appends an identical message to the sent folder, the deduplication works and instead returns the message ID of the existing proton message, rather than creating a new message. Gluon is expected to notice that it already has this message ID and perform some deduplication stuff internally. However, it can happen that gluon doesn't yet have this message ID, because we haven't yet received the "Message Created" event from the API. To prevent this, we poll the events after send and wait for all new events to be applied. There's still a chance that the event wasn't generated yet on the API side. Not sure what we can do about this.
This commit is contained in:
@ -72,8 +72,9 @@ type User struct {
|
||||
|
||||
tasks *async.Group
|
||||
abortable async.Abortable
|
||||
pollCh chan chan struct{}
|
||||
goPoll func(bool)
|
||||
goSync func()
|
||||
goPoll func()
|
||||
|
||||
syncWorkers int
|
||||
syncBuffer int
|
||||
@ -130,7 +131,8 @@ func New(
|
||||
|
||||
reporter: reporter,
|
||||
|
||||
tasks: async.NewGroup(context.Background(), crashHandler),
|
||||
tasks: async.NewGroup(context.Background(), crashHandler),
|
||||
pollCh: make(chan chan struct{}),
|
||||
|
||||
syncWorkers: syncWorkers,
|
||||
syncBuffer: syncBuffer,
|
||||
@ -166,16 +168,49 @@ func New(
|
||||
// This does nothing until the sync has been marked as complete.
|
||||
// When we receive an API event, we attempt to handle it.
|
||||
// If successful, we update the event ID in the vault.
|
||||
user.goPoll = user.tasks.PeriodicOrTrigger(EventPeriod, EventJitter, func(ctx context.Context) {
|
||||
user.log.Debug("Event poll triggered")
|
||||
user.tasks.Once(func(ctx context.Context) {
|
||||
ticker := liteapi.NewTicker(EventPeriod, EventJitter)
|
||||
defer ticker.Stop()
|
||||
|
||||
if !user.vault.SyncStatus().IsComplete() {
|
||||
user.log.Debug("Sync is incomplete, skipping event poll")
|
||||
} else if err := user.doEventPoll(ctx); err != nil {
|
||||
user.log.WithError(err).Error("Failed to poll events")
|
||||
for {
|
||||
var doneCh chan struct{}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case doneCh = <-user.pollCh:
|
||||
// ...
|
||||
|
||||
case <-ticker.C:
|
||||
// ...
|
||||
}
|
||||
|
||||
user.log.Debug("Event poll triggered")
|
||||
|
||||
if !user.vault.SyncStatus().IsComplete() {
|
||||
user.log.Debug("Sync is incomplete, skipping event poll")
|
||||
} else if err := user.doEventPoll(ctx); err != nil {
|
||||
user.log.WithError(err).Error("Failed to poll events")
|
||||
}
|
||||
|
||||
if doneCh != nil {
|
||||
close(doneCh)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// When triggered, poll the API for events, optionally blocking until the poll is complete.
|
||||
user.goPoll = func(wait bool) {
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
go func() { user.pollCh <- doneCh }()
|
||||
|
||||
if wait {
|
||||
<-doneCh
|
||||
}
|
||||
}
|
||||
|
||||
// When triggered, attempt to sync the user.
|
||||
user.goSync = user.tasks.Trigger(func(ctx context.Context) {
|
||||
user.log.Debug("Sync triggered")
|
||||
@ -376,7 +411,7 @@ func (user *User) NewIMAPConnectors() (map[string]connector.Connector, error) {
|
||||
//
|
||||
// nolint:funlen
|
||||
func (user *User) SendMail(authID string, from string, to []string, r io.Reader) error {
|
||||
defer user.goPoll()
|
||||
defer user.goPoll(true)
|
||||
|
||||
if len(to) == 0 {
|
||||
return ErrInvalidRecipient
|
||||
|
||||
Reference in New Issue
Block a user