diff --git a/go.mod b/go.mod index 9bf0fa5b..d6f6a780 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/urfave/cli/v2 v2.20.3 github.com/vmihailenco/msgpack/v5 v5.3.5 - gitlab.protontech.ch/go/liteapi v0.37.2 + gitlab.protontech.ch/go/liteapi v0.38.0 go.uber.org/goleak v1.2.0 golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e golang.org/x/net v0.1.0 diff --git a/go.sum b/go.sum index 33d17dd3..3fcfe3f3 100644 --- a/go.sum +++ b/go.sum @@ -403,8 +403,8 @@ github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsr github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zclconf/go-cty v1.11.0 h1:726SxLdi2SDnjY+BStqB9J1hNp4+2WlzyXLuimibIe0= github.com/zclconf/go-cty v1.11.0/go.mod h1:s9IfD1LK5ccNMSWCVFCE2rJfHiZgi7JijgeWIMfhLvA= -gitlab.protontech.ch/go/liteapi v0.37.2 h1:9oM5knclpye72tFpphvGbqb4uPnC4q1VwFu+WksS9Xg= -gitlab.protontech.ch/go/liteapi v0.37.2/go.mod h1:IM7ADWjgIL2hXopzx0WNamizEuMgM2QZl7QH12FNflk= +gitlab.protontech.ch/go/liteapi v0.38.0 h1:3/8HIihIxBSaLotY8D9oJS1ovCcDN15j+otSd0zO0R8= +gitlab.protontech.ch/go/liteapi v0.38.0/go.mod h1:IM7ADWjgIL2hXopzx0WNamizEuMgM2QZl7QH12FNflk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= diff --git a/internal/user/imap.go b/internal/user/imap.go index e8beab7b..9c429dec 100644 --- a/internal/user/imap.go +++ b/internal/user/imap.go @@ -268,6 +268,8 @@ func (conn *imapConnector) CreateMessage( if messageID, ok, err := conn.sendHash.hasEntryWait(ctx, hash, time.Now().Add(90*time.Second)); err != nil { return imap.Message{}, nil, fmt.Errorf("failed to check send hash: %w", err) } else if ok { + conn.log.WithField("messageID", messageID).Debug("Message already sent") + message, err := conn.client.GetMessage(ctx, messageID) if err != nil { return imap.Message{}, nil, fmt.Errorf("failed to fetch message: %w", err) diff --git a/internal/user/user.go b/internal/user/user.go index 5b8ce9b4..9d89fb62 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "strings" - "sync" "sync/atomic" "time" @@ -73,6 +72,7 @@ type User struct { tasks *xsync.Group abortable async.Abortable goSync func() + goPoll func() syncWorkers int syncBuffer int @@ -183,23 +183,40 @@ func New( }) // Stream events from the API, logging any errors that occur. + // This does nothing until the sync has been marked as complete. // When we receive an API event, we attempt to handle it. // If successful, we update the event ID in the vault. - goStream := user.tasks.Trigger(func(ctx context.Context) { - async.RangeContext(ctx, user.client.NewEventStream(ctx, EventPeriod, EventJitter, user.vault.EventID()), func(event liteapi.Event) { - if err := user.handleAPIEvent(ctx, event); err != nil { - user.log.WithError(err).Error("Failed to handle API event") - } else if err := user.vault.SetEventID(event.EventID); err != nil { - user.log.WithError(err).Error("Failed to update event ID in vault") - } - }) + user.goPoll = user.tasks.PeriodicOrTrigger(EventPeriod, EventJitter, func(ctx context.Context) { + if !user.vault.SyncStatus().IsComplete() { + user.log.Debug("Sync is incomplete, skipping event stream") + return + } + + event, err := user.client.GetEvent(ctx, user.vault.EventID()) + if err != nil { + user.log.WithError(err).Error("Failed to get event") + return + } + + if event.EventID == user.vault.EventID() { + user.log.Debug("No new events") + return + } + + if err := user.handleAPIEvent(ctx, event); err != nil { + user.log.WithError(err).Error("Failed to handle API event") + return + } + + if err := user.vault.SetEventID(event.EventID); err != nil { + user.log.WithError(err).Error("Failed to update event ID in vault") + return + } + + user.log.WithField("eventID", event.EventID).Debug("Updated event ID") }) - // We only ever want to start one event streamer. - var once sync.Once - // When triggered, attempt to sync the user. - // If successful, we start the event streamer if we haven't already. user.goSync = user.tasks.Trigger(func(ctx context.Context) { user.abortable.Do(ctx, func(ctx context.Context) { if !user.vault.SyncStatus().IsComplete() { @@ -207,12 +224,10 @@ func New( return } } - - once.Do(goStream) }) }) - // Trigger an initial sync (if necessary) and start the event stream. + // Trigger an initial sync (if necessary). user.goSync() return user, nil @@ -386,6 +401,8 @@ func (user *User) NewIMAPConnectors() (map[string]connector.Connector, error) { // // nolint:funlen func (user *User) SendMail(authID string, from string, to []string, r io.Reader) error { + defer user.goPoll() + ctx, cancel := context.WithCancel(context.Background()) defer cancel()