From a213b48f93b59f023f044a1cd4706c9e5ea11b18 Mon Sep 17 00:00:00 2001 From: James Houlahan Date: Mon, 31 Oct 2022 17:36:50 +0100 Subject: [PATCH] GODT-2002: Poll after SMTP send After sending, a client might append to the sent folder over IMAP. In this case, we perform deduplication and return the message ID of the sent message. However, if we haven't already processed this message in gluon, it doesn't work as expected. This change polls the event stream immediately after send. Note that it doesn't wait for these events to be processed; that should be done in a follow-up commit. --- go.mod | 2 +- go.sum | 4 ++-- internal/user/imap.go | 2 ++ internal/user/user.go | 49 +++++++++++++++++++++++++++++-------------- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 9bf0fa5b..d6f6a780 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/urfave/cli/v2 v2.20.3 github.com/vmihailenco/msgpack/v5 v5.3.5 - gitlab.protontech.ch/go/liteapi v0.37.2 + gitlab.protontech.ch/go/liteapi v0.38.0 go.uber.org/goleak v1.2.0 golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e golang.org/x/net v0.1.0 diff --git a/go.sum b/go.sum index 33d17dd3..3fcfe3f3 100644 --- a/go.sum +++ b/go.sum @@ -403,8 +403,8 @@ github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsr github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zclconf/go-cty v1.11.0 h1:726SxLdi2SDnjY+BStqB9J1hNp4+2WlzyXLuimibIe0= github.com/zclconf/go-cty v1.11.0/go.mod h1:s9IfD1LK5ccNMSWCVFCE2rJfHiZgi7JijgeWIMfhLvA= -gitlab.protontech.ch/go/liteapi v0.37.2 h1:9oM5knclpye72tFpphvGbqb4uPnC4q1VwFu+WksS9Xg= -gitlab.protontech.ch/go/liteapi v0.37.2/go.mod h1:IM7ADWjgIL2hXopzx0WNamizEuMgM2QZl7QH12FNflk= +gitlab.protontech.ch/go/liteapi v0.38.0 h1:3/8HIihIxBSaLotY8D9oJS1ovCcDN15j+otSd0zO0R8= +gitlab.protontech.ch/go/liteapi v0.38.0/go.mod h1:IM7ADWjgIL2hXopzx0WNamizEuMgM2QZl7QH12FNflk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= diff --git a/internal/user/imap.go b/internal/user/imap.go index e8beab7b..9c429dec 100644 --- a/internal/user/imap.go +++ b/internal/user/imap.go @@ -268,6 +268,8 @@ func (conn *imapConnector) CreateMessage( if messageID, ok, err := conn.sendHash.hasEntryWait(ctx, hash, time.Now().Add(90*time.Second)); err != nil { return imap.Message{}, nil, fmt.Errorf("failed to check send hash: %w", err) } else if ok { + conn.log.WithField("messageID", messageID).Debug("Message already sent") + message, err := conn.client.GetMessage(ctx, messageID) if err != nil { return imap.Message{}, nil, fmt.Errorf("failed to fetch message: %w", err) diff --git a/internal/user/user.go b/internal/user/user.go index 5b8ce9b4..9d89fb62 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "strings" - "sync" "sync/atomic" "time" @@ -73,6 +72,7 @@ type User struct { tasks *xsync.Group abortable async.Abortable goSync func() + goPoll func() syncWorkers int syncBuffer int @@ -183,23 +183,40 @@ func New( }) // Stream events from the API, logging any errors that occur. + // This does nothing until the sync has been marked as complete. // When we receive an API event, we attempt to handle it. // If successful, we update the event ID in the vault. - goStream := user.tasks.Trigger(func(ctx context.Context) { - async.RangeContext(ctx, user.client.NewEventStream(ctx, EventPeriod, EventJitter, user.vault.EventID()), func(event liteapi.Event) { - if err := user.handleAPIEvent(ctx, event); err != nil { - user.log.WithError(err).Error("Failed to handle API event") - } else if err := user.vault.SetEventID(event.EventID); err != nil { - user.log.WithError(err).Error("Failed to update event ID in vault") - } - }) + user.goPoll = user.tasks.PeriodicOrTrigger(EventPeriod, EventJitter, func(ctx context.Context) { + if !user.vault.SyncStatus().IsComplete() { + user.log.Debug("Sync is incomplete, skipping event stream") + return + } + + event, err := user.client.GetEvent(ctx, user.vault.EventID()) + if err != nil { + user.log.WithError(err).Error("Failed to get event") + return + } + + if event.EventID == user.vault.EventID() { + user.log.Debug("No new events") + return + } + + if err := user.handleAPIEvent(ctx, event); err != nil { + user.log.WithError(err).Error("Failed to handle API event") + return + } + + if err := user.vault.SetEventID(event.EventID); err != nil { + user.log.WithError(err).Error("Failed to update event ID in vault") + return + } + + user.log.WithField("eventID", event.EventID).Debug("Updated event ID") }) - // We only ever want to start one event streamer. - var once sync.Once - // When triggered, attempt to sync the user. - // If successful, we start the event streamer if we haven't already. user.goSync = user.tasks.Trigger(func(ctx context.Context) { user.abortable.Do(ctx, func(ctx context.Context) { if !user.vault.SyncStatus().IsComplete() { @@ -207,12 +224,10 @@ func New( return } } - - once.Do(goStream) }) }) - // Trigger an initial sync (if necessary) and start the event stream. + // Trigger an initial sync (if necessary). user.goSync() return user, nil @@ -386,6 +401,8 @@ func (user *User) NewIMAPConnectors() (map[string]connector.Connector, error) { // // nolint:funlen func (user *User) SendMail(authID string, from string, to []string, r io.Reader) error { + defer user.goPoll() + ctx, cancel := context.WithCancel(context.Background()) defer cancel()