GODT-1657: More stable sync, with some tests

This commit is contained in:
James Houlahan
2022-10-09 23:05:52 +02:00
parent e7526f2e78
commit 509a767e50
41 changed files with 883 additions and 779 deletions

View File

@ -259,7 +259,12 @@ func (user *User) handleMessageEvents(ctx context.Context, messageEvents []litea
}
func (user *User) handleCreateMessageEvent(ctx context.Context, event liteapi.MessageEvent) error {
buildRes, err := user.buildRFC822(ctx, event.Message)
full, err := user.client.GetFullMessage(ctx, event.Message.ID)
if err != nil {
return fmt.Errorf("failed to get full message: %w", err)
}
buildRes, err := buildRFC822(ctx, full, user.addrKRs)
if err != nil {
return fmt.Errorf("failed to build RFC822: %w", err)
}

View File

@ -4,14 +4,11 @@ import (
"context"
"errors"
"fmt"
"runtime"
"strings"
"time"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/queue"
"github.com/bradenaw/juniper/iterator"
"github.com/bradenaw/juniper/parallel"
"github.com/bradenaw/juniper/stream"
"github.com/bradenaw/juniper/xslices"
"github.com/google/uuid"
@ -105,34 +102,38 @@ func syncLabels(ctx context.Context, client *liteapi.Client, updateCh ...*queue.
func (user *User) syncMessages(ctx context.Context) error {
// Determine which messages to sync.
metadata, err := user.client.GetAllMessageMetadata(ctx, nil)
allMetadata, err := user.client.GetAllMessageMetadata(ctx, nil)
if err != nil {
return fmt.Errorf("get all message metadata: %w", err)
}
// If possible, begin syncing from the last synced message.
metadata := allMetadata
// If possible, begin syncing from one beyond the last synced message.
if beginID := user.vault.SyncStatus().LastMessageID; beginID != "" {
if idx := xslices.IndexFunc(metadata, func(metadata liteapi.MessageMetadata) bool {
return metadata.ID == beginID
}); idx >= 0 {
metadata = metadata[idx:]
metadata = metadata[idx+1:]
}
}
// Process the metadata, building the messages.
buildCh := stream.Chunk(parallel.MapStream(
ctx,
stream.FromIterator(iterator.Slice(metadata)),
runtime.NumCPU()*runtime.NumCPU()/2,
runtime.NumCPU()*runtime.NumCPU()/2,
user.buildRFC822,
buildCh := stream.Chunk(stream.Map(
user.client.GetFullMessages(ctx, xslices.Map(metadata, func(metadata liteapi.MessageMetadata) string {
return metadata.ID
})...),
func(ctx context.Context, full liteapi.FullMessage) (*buildRes, error) {
return buildRFC822(ctx, full, user.addrKRs)
},
), maxBatchSize)
defer buildCh.Close()
// Create the flushers, one per update channel.
flushers := make(map[string]*flusher)
for addrID, updateCh := range user.updateCh {
flusher := newFlusher(user.ID(), updateCh, maxUpdateSize)
flusher := newFlusher(updateCh, maxUpdateSize)
defer flusher.flush(ctx, true)
flushers[addrID] = flusher
@ -142,6 +143,8 @@ func (user *User) syncMessages(ctx context.Context) error {
reporter := newReporter(user.ID(), user.eventCh, len(metadata), time.Second)
defer reporter.done()
var count int
// Send each update to the appropriate flusher.
for {
batch, err := buildCh.Next(ctx)
@ -170,6 +173,8 @@ func (user *User) syncMessages(ctx context.Context) error {
}
reporter.add(len(batch))
count += len(batch)
}
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/ProtonMail/proton-bridge/v2/pkg/message"
"github.com/bradenaw/juniper/xslices"
"gitlab.protontech.ch/go/liteapi"
@ -29,30 +30,20 @@ func defaultJobOpts() message.JobOptions {
}
}
func (user *User) buildRFC822(ctx context.Context, metadata liteapi.MessageMetadata) (*buildRes, error) {
msg, err := user.client.GetMessage(ctx, metadata.ID)
func buildRFC822(ctx context.Context, full liteapi.FullMessage, addrKRs map[string]*crypto.KeyRing) (*buildRes, error) {
literal, err := message.BuildRFC822(addrKRs[full.AddressID], full.Message, full.AttData, defaultJobOpts())
if err != nil {
return nil, fmt.Errorf("failed to get message %s: %w", metadata.ID, err)
return nil, fmt.Errorf("failed to build message %s: %w", full.ID, err)
}
attData, err := user.attPool.ProcessAll(ctx, xslices.Map(msg.Attachments, func(att liteapi.Attachment) string { return att.ID }))
update, err := newMessageCreatedUpdate(full.MessageMetadata, literal)
if err != nil {
return nil, fmt.Errorf("failed to get attachments for message %s: %w", metadata.ID, err)
}
literal, err := message.BuildRFC822(user.addrKRs[msg.AddressID], msg, attData, defaultJobOpts())
if err != nil {
return nil, fmt.Errorf("failed to build message %s: %w", metadata.ID, err)
}
update, err := newMessageCreatedUpdate(metadata, literal)
if err != nil {
return nil, fmt.Errorf("failed to create IMAP update for message %s: %w", metadata.ID, err)
return nil, fmt.Errorf("failed to create IMAP update for message %s: %w", full.ID, err)
}
return &buildRes{
messageID: metadata.ID,
addressID: metadata.AddressID,
messageID: full.ID,
addressID: full.AddressID,
update: update,
}, nil
}

View File

@ -9,21 +9,19 @@ import (
)
type flusher struct {
userID string
updateCh *queue.QueuedChannel[imap.Update]
updates []*imap.MessageCreated
updates []*imap.MessageCreated
maxChunkSize int
curChunkSize int
maxUpdateSize int
curChunkSize int
pushLock sync.Mutex
}
func newFlusher(userID string, updateCh *queue.QueuedChannel[imap.Update], maxChunkSize int) *flusher {
func newFlusher(updateCh *queue.QueuedChannel[imap.Update], maxUpdateSize int) *flusher {
return &flusher{
userID: userID,
updateCh: updateCh,
maxChunkSize: maxChunkSize,
updateCh: updateCh,
maxUpdateSize: maxUpdateSize,
}
}
@ -33,20 +31,18 @@ func (f *flusher) push(ctx context.Context, update *imap.MessageCreated) {
f.updates = append(f.updates, update)
if f.curChunkSize += len(update.Literal); f.curChunkSize >= f.maxChunkSize {
if f.curChunkSize += len(update.Literal); f.curChunkSize >= f.maxUpdateSize {
f.flush(ctx, false)
}
}
func (f *flusher) flush(ctx context.Context, wait bool) {
if len(f.updates) == 0 {
return
if len(f.updates) > 0 {
f.updateCh.Enqueue(imap.NewMessagesCreated(f.updates...))
f.updates = nil
f.curChunkSize = 0
}
f.updateCh.Enqueue(imap.NewMessagesCreated(f.updates...))
f.updates = nil
f.curChunkSize = 0
if wait {
update := imap.NewNoop()
defer update.WaitContext(ctx)

View File

@ -5,7 +5,6 @@ import (
"context"
"encoding/hex"
"fmt"
"runtime"
"time"
"github.com/ProtonMail/gluon/connector"
@ -14,7 +13,6 @@ import (
"github.com/ProtonMail/gluon/wait"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/ProtonMail/proton-bridge/v2/internal/events"
"github.com/ProtonMail/proton-bridge/v2/internal/pool"
"github.com/ProtonMail/proton-bridge/v2/internal/safe"
"github.com/ProtonMail/proton-bridge/v2/internal/vault"
"github.com/bradenaw/juniper/xslices"
@ -31,7 +29,6 @@ var (
type User struct {
vault *vault.User
client *liteapi.Client
attPool *pool.Pool[string, []byte]
eventCh *queue.QueuedChannel[events.Event]
apiUser *safe.Type[liteapi.User]
@ -91,7 +88,6 @@ func New(ctx context.Context, encVault *vault.User, client *liteapi.Client, apiU
user := &User{
vault: encVault,
client: client,
attPool: pool.New(runtime.NumCPU(), client.GetAttachment),
eventCh: queue.NewQueuedChannel[events.Event](0, 0),
apiUser: safe.NewType(apiUser),
@ -123,7 +119,7 @@ func New(ctx context.Context, encVault *vault.User, client *liteapi.Client, apiU
// If we haven't synced yet, do it first.
// If it fails, we don't start the event loop.
// Oterwise, begin processing API events, logging any errors that occur.
// Otherwise, begin processing API events, logging any errors that occur.
go func() {
if status := user.vault.SyncStatus(); !status.HasMessages {
if err := <-user.startSync(); err != nil {
@ -336,8 +332,17 @@ func (user *User) NewSMTPSession(email string) (smtp.Session, error) {
}
// Logout logs the user out from the API.
// If withVault is true, the user's vault is also cleared.
func (user *User) Logout(ctx context.Context) error {
return user.client.AuthDelete(ctx)
if err := user.client.AuthDelete(ctx); err != nil {
return fmt.Errorf("failed to delete auth: %w", err)
}
if err := user.vault.Clear(); err != nil {
return fmt.Errorf("failed to clear vault: %w", err)
}
return nil
}
// Close closes ongoing connections and cleans up resources.
@ -345,9 +350,6 @@ func (user *User) Close() error {
// Cancel ongoing syncs.
user.stopSync()
// Close the attachment pool.
user.attPool.Done()
// Close the user's API client.
user.client.Close()

View File

@ -89,13 +89,13 @@ func withAPI(t *testing.T, ctx context.Context, fn func(context.Context, *server
func withAccount(t *testing.T, s *server.Server, username, password string, emails []string, fn func(string, []string)) {
var addrIDs []string
userID, addrID, err := s.CreateUser(username, password, emails[0])
userID, addrID, err := s.CreateUser(username, emails[0], []byte(password))
require.NoError(t, err)
addrIDs = append(addrIDs, addrID)
for _, email := range emails[1:] {
addrID, err := s.CreateAddress(userID, email, password)
addrID, err := s.CreateAddress(userID, email, []byte(password))
require.NoError(t, err)
addrIDs = append(addrIDs, addrID)