feat(GODT-2500): Reorganise async methods.

This commit is contained in:
Jakub
2023-03-30 17:50:40 +02:00
parent ec92c918cd
commit de5fd07a22
42 changed files with 136 additions and 511 deletions

View File

@ -25,8 +25,8 @@ import (
"net/http"
"github.com/ProtonMail/gluon"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/reporter"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/gopenpgp/v2/crypto"
@ -225,7 +225,7 @@ func (user *User) handleCreateAddressEvent(ctx context.Context, event proton.Add
user.updateCh[event.Address.ID] = user.updateCh[primAddr.ID]
case vault.SplitMode:
user.updateCh[event.Address.ID] = queue.NewQueuedChannel[imap.Update](0, 0, user.panicHandler)
user.updateCh[event.Address.ID] = async.NewQueuedChannel[imap.Update](0, 0, user.panicHandler)
}
user.eventCh.Enqueue(events.UserAddressCreated{
@ -284,7 +284,7 @@ func (user *User) handleUpdateAddressEvent(_ context.Context, event proton.Addre
user.updateCh[event.Address.ID] = user.updateCh[primAddr.ID]
case vault.SplitMode:
user.updateCh[event.Address.ID] = queue.NewQueuedChannel[imap.Update](0, 0, user.panicHandler)
user.updateCh[event.Address.ID] = async.NewQueuedChannel[imap.Update](0, 0, user.panicHandler)
}
user.eventCh.Enqueue(events.UserAddressEnabled{

View File

@ -29,6 +29,7 @@ import (
"strings"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/reporter"
"github.com/ProtonMail/gluon/rfc5322"
"github.com/ProtonMail/gluon/rfc822"
@ -48,7 +49,7 @@ import (
// sendMail sends an email from the given address to the given recipients.
func (user *User) sendMail(authID string, from string, to []string, r io.Reader) error {
defer user.handlePanic()
defer async.HandlePanic(user.panicHandler)
return safe.RLockRet(func() error {
ctx, cancel := context.WithCancel(context.Background())
@ -392,7 +393,7 @@ func (user *User) createAttachments(
}
keys, err := parallel.MapContext(ctx, runtime.NumCPU(), attachments, func(ctx context.Context, att message.Attachment) (attKey, error) {
defer user.handlePanic()
defer async.HandlePanic(user.panicHandler)
logrus.WithFields(logrus.Fields{
"name": logging.Sensitive(att.Name),
@ -471,7 +472,7 @@ func (user *User) getRecipients(
})
prefs, err := parallel.MapContext(ctx, runtime.NumCPU(), addresses, func(ctx context.Context, recipient string) (proton.SendPreferences, error) {
defer user.handlePanic()
defer async.HandlePanic(user.panicHandler)
pubKeys, recType, err := client.GetPublicKeys(ctx, recipient)
if err != nil {

View File

@ -25,9 +25,9 @@ import (
"strings"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/logging"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/reporter"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/gopenpgp/v2/crypto"
@ -184,7 +184,7 @@ func (user *User) sync(ctx context.Context) error {
}
// nolint:exhaustive
func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh ...*queue.QueuedChannel[imap.Update]) error {
func syncLabels(ctx context.Context, apiLabels map[string]proton.Label, updateCh ...*async.QueuedChannel[imap.Update]) error {
var updates []imap.Update
// Create placeholder Folders/Labels mailboxes with the \Noselect attribute.
@ -251,8 +251,8 @@ func (user *User) syncMessages(
vault *vault.User,
apiLabels map[string]proton.Label,
addrKRs map[string]*crypto.KeyRing,
updateCh map[string]*queue.QueuedChannel[imap.Update],
eventCh *queue.QueuedChannel[events.Event],
updateCh map[string]*async.QueuedChannel[imap.Update],
eventCh *async.QueuedChannel[events.Event],
maxSyncMemory uint64,
) error {
ctx, cancel := context.WithCancel(ctx)
@ -456,7 +456,7 @@ func (user *User) syncMessages(
}
result, err := parallel.MapContext(ctx, maxParallelDownloads, request.ids, func(ctx context.Context, id string) (proton.FullMessage, error) {
defer user.handlePanic()
defer async.HandlePanic(user.panicHandler)
var result proton.FullMessage
@ -511,7 +511,7 @@ func (user *User) syncMessages(
logrus.Debugf("Build request: %v of %v count=%v", index, len(chunks), len(chunk))
result, err := parallel.MapContext(ctx, maxMessagesInParallel, chunk, func(ctx context.Context, msg proton.FullMessage) (*buildRes, error) {
defer user.handlePanic()
defer async.HandlePanic(user.panicHandler)
return buildRFC822(apiLabels, msg, addrKRs[msg.AddressID], new(bytes.Buffer)), nil
})
@ -538,7 +538,7 @@ func (user *User) syncMessages(
type updateTargetInfo struct {
queueIndex int
ch *queue.QueuedChannel[imap.Update]
ch *async.QueuedChannel[imap.Update]
}
pendingUpdates := make([][]*imap.MessageCreated, len(updateCh))

View File

@ -20,13 +20,13 @@ package user
import (
"time"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/proton-bridge/v3/internal/events"
)
type syncReporter struct {
userID string
eventCh *queue.QueuedChannel[events.Event]
eventCh *async.QueuedChannel[events.Event]
start time.Time
total int
@ -36,7 +36,7 @@ type syncReporter struct {
freq time.Duration
}
func newSyncReporter(userID string, eventCh *queue.QueuedChannel[events.Event], total int, freq time.Duration) *syncReporter {
func newSyncReporter(userID string, eventCh *async.QueuedChannel[events.Event], total int, freq time.Duration) *syncReporter {
return &syncReporter{
userID: userID,
eventCh: eventCh,

View File

@ -23,8 +23,8 @@ import (
"runtime"
"strings"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/proton-bridge/v3/internal/async"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

View File

@ -29,13 +29,12 @@ import (
"sync/atomic"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/connector"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/queue"
"github.com/ProtonMail/gluon/reporter"
"github.com/ProtonMail/go-proton-api"
"github.com/ProtonMail/proton-bridge/v3/internal"
"github.com/ProtonMail/proton-bridge/v3/internal/async"
"github.com/ProtonMail/proton-bridge/v3/internal/events"
"github.com/ProtonMail/proton-bridge/v3/internal/logging"
"github.com/ProtonMail/proton-bridge/v3/internal/safe"
@ -65,7 +64,7 @@ type User struct {
reporter reporter.Reporter
sendHash *sendRecorder
eventCh *queue.QueuedChannel[events.Event]
eventCh *async.QueuedChannel[events.Event]
eventLock safe.RWMutex
apiUser proton.User
@ -77,7 +76,7 @@ type User struct {
apiLabels map[string]proton.Label
apiLabelsLock safe.RWMutex
updateCh map[string]*queue.QueuedChannel[imap.Update]
updateCh map[string]*async.QueuedChannel[imap.Update]
updateChLock safe.RWMutex
tasks *async.Group
@ -129,7 +128,7 @@ func New(
reporter: reporter,
sendHash: newSendRecorder(sendEntryExpiry),
eventCh: queue.NewQueuedChannel[events.Event](0, 0, crashHandler),
eventCh: async.NewQueuedChannel[events.Event](0, 0, crashHandler),
eventLock: safe.NewRWMutex(),
apiUser: apiUser,
@ -141,7 +140,7 @@ func New(
apiLabels: groupBy(apiLabels, func(label proton.Label) string { return label.ID }),
apiLabelsLock: safe.NewRWMutex(),
updateCh: make(map[string]*queue.QueuedChannel[imap.Update]),
updateCh: make(map[string]*async.QueuedChannel[imap.Update]),
updateChLock: safe.NewRWMutex(),
tasks: async.NewGroup(context.Background(), crashHandler),
@ -184,7 +183,7 @@ func New(
doneCh := make(chan struct{})
go func() {
defer user.handlePanic()
defer async.HandlePanic(user.panicHandler)
user.pollAPIEventsCh <- doneCh
}()
@ -237,12 +236,6 @@ func New(
return user, nil
}
func (user *User) handlePanic() {
if user.panicHandler != nil {
user.panicHandler.HandlePanic()
}
}
func (user *User) TriggerSync() {
user.goSync()
}
@ -605,11 +598,11 @@ func (user *User) initUpdateCh(mode vault.AddressMode) {
updateCh.CloseAndDiscardQueued()
}
user.updateCh = make(map[string]*queue.QueuedChannel[imap.Update])
user.updateCh = make(map[string]*async.QueuedChannel[imap.Update])
switch mode {
case vault.CombinedMode:
primaryUpdateCh := queue.NewQueuedChannel[imap.Update](0, 0, user.panicHandler)
primaryUpdateCh := async.NewQueuedChannel[imap.Update](0, 0, user.panicHandler)
for addrID := range user.apiAddrs {
user.updateCh[addrID] = primaryUpdateCh
@ -617,7 +610,7 @@ func (user *User) initUpdateCh(mode vault.AddressMode) {
case vault.SplitMode:
for addrID := range user.apiAddrs {
user.updateCh[addrID] = queue.NewQueuedChannel[imap.Update](0, 0, user.panicHandler)
user.updateCh[addrID] = async.NewQueuedChannel[imap.Update](0, 0, user.panicHandler)
}
}
}