GODT-1657: Stable sync (still needs more tests)

This commit is contained in:
James Houlahan
2022-10-01 23:14:42 +02:00
parent 705875cff2
commit 9d69a2e565
34 changed files with 1270 additions and 1099 deletions

View File

@ -2,131 +2,174 @@ package user
import (
"context"
"errors"
"fmt"
"runtime"
"strings"
"time"
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/proton-bridge/v2/internal/vault"
"github.com/ProtonMail/gluon/queue"
"github.com/bradenaw/juniper/iterator"
"github.com/bradenaw/juniper/parallel"
"github.com/bradenaw/juniper/stream"
"github.com/bradenaw/juniper/xslices"
"github.com/google/uuid"
"gitlab.protontech.ch/go/liteapi"
"golang.org/x/exp/maps"
)
const chunkSize = 1 << 20
const (
maxUpdateSize = 1 << 25
maxBatchSize = 1 << 8
)
func (user *User) syncLabels(ctx context.Context, addrIDs ...string) error {
func (user *User) sync(ctx context.Context) error {
if !user.vault.SyncStatus().HasLabels {
if err := syncLabels(ctx, user.client, maps.Values(user.updateCh)...); err != nil {
return fmt.Errorf("failed to sync labels: %w", err)
}
if err := user.vault.SetHasLabels(true); err != nil {
return fmt.Errorf("failed to set has labels: %w", err)
}
}
if !user.vault.SyncStatus().HasMessages {
if err := user.syncMessages(ctx); err != nil {
return fmt.Errorf("failed to sync messages: %w", err)
}
if err := user.vault.SetHasMessages(true); err != nil {
return fmt.Errorf("failed to set has messages: %w", err)
}
}
return nil
}
func syncLabels(ctx context.Context, client *liteapi.Client, updateCh ...*queue.QueuedChannel[imap.Update]) error {
// Sync the system folders.
system, err := user.client.GetLabels(ctx, liteapi.LabelTypeSystem)
system, err := client.GetLabels(ctx, liteapi.LabelTypeSystem)
if err != nil {
return err
return fmt.Errorf("failed to get system labels: %w", err)
}
for _, label := range xslices.Filter(system, func(label liteapi.Label) bool { return wantLabelID(label.ID) }) {
for _, addrID := range addrIDs {
user.updateCh[addrID].Enqueue(newSystemMailboxCreatedUpdate(imap.LabelID(label.ID), label.Name))
for _, updateCh := range updateCh {
updateCh.Enqueue(newSystemMailboxCreatedUpdate(imap.LabelID(label.ID), label.Name))
}
}
// Create Folders/Labels mailboxes with a random ID and with the \Noselect attribute.
for _, prefix := range []string{folderPrefix, labelPrefix} {
for _, addrID := range addrIDs {
user.updateCh[addrID].Enqueue(newPlaceHolderMailboxCreatedUpdate(prefix))
for _, updateCh := range updateCh {
updateCh.Enqueue(newPlaceHolderMailboxCreatedUpdate(prefix))
}
}
// Sync the API folders.
folders, err := user.client.GetLabels(ctx, liteapi.LabelTypeFolder)
folders, err := client.GetLabels(ctx, liteapi.LabelTypeFolder)
if err != nil {
return err
return fmt.Errorf("failed to get folders: %w", err)
}
for _, folder := range folders {
for _, addrID := range addrIDs {
user.updateCh[addrID].Enqueue(newMailboxCreatedUpdate(imap.LabelID(folder.ID), []string{folderPrefix, folder.Path}))
for _, updateCh := range updateCh {
updateCh.Enqueue(newMailboxCreatedUpdate(imap.LabelID(folder.ID), []string{folderPrefix, folder.Path}))
}
}
// Sync the API labels.
labels, err := user.client.GetLabels(ctx, liteapi.LabelTypeLabel)
labels, err := client.GetLabels(ctx, liteapi.LabelTypeLabel)
if err != nil {
return err
return fmt.Errorf("failed to get labels: %w", err)
}
for _, label := range labels {
for _, addrID := range addrIDs {
user.updateCh[addrID].Enqueue(newMailboxCreatedUpdate(imap.LabelID(label.ID), []string{labelPrefix, label.Path}))
for _, updateCh := range updateCh {
updateCh.Enqueue(newMailboxCreatedUpdate(imap.LabelID(label.ID), []string{labelPrefix, label.Path}))
}
}
// Wait for all label updates to be applied.
for _, updateCh := range updateCh {
update := imap.NewNoop()
defer update.WaitContext(ctx)
updateCh.Enqueue(update)
}
return nil
}
func (user *User) syncMessages(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Determine which messages to sync.
// TODO: This needs to be done better using the new API route to retrieve just the message IDs.
metadata, err := user.client.GetAllMessageMetadata(ctx, nil)
if err != nil {
return err
return fmt.Errorf("get all message metadata: %w", err)
}
// If in split mode, we need to send each message to a different IMAP connector.
isSplitMode := user.vault.AddressMode() == vault.SplitMode
// Collect the build requests -- we need:
// - the message ID to build,
// - the keyring to decrypt the message,
// - and the address to send the message to (for split mode).
requests := xslices.Map(metadata, func(metadata liteapi.MessageMetadata) request {
var addressID string
if isSplitMode {
addressID = metadata.AddressID
} else {
addressID = user.apiAddrs.primary()
// If possible, begin syncing from the last synced message.
if beginID := user.vault.SyncStatus().LastMessageID; beginID != "" {
if idx := xslices.IndexFunc(metadata, func(metadata liteapi.MessageMetadata) bool {
return metadata.ID == beginID
}); idx >= 0 {
metadata = metadata[idx:]
}
}
return request{
messageID: metadata.ID,
addressID: addressID,
addrKR: user.addrKRs[metadata.AddressID],
}
})
// Process the metadata, building the messages.
buildCh := stream.Chunk(parallel.MapStream(
ctx,
stream.FromIterator(iterator.Slice(metadata)),
runtime.NumCPU()*runtime.NumCPU()/2,
runtime.NumCPU()*runtime.NumCPU()/2,
user.buildRFC822,
), maxBatchSize)
// Create the flushers, one per update channel.
flushers := make(map[string]*flusher)
for addrID, updateCh := range user.updateCh {
flusher := newFlusher(user.ID(), updateCh, user.eventCh, len(requests), chunkSize)
defer flusher.flush()
flusher := newFlusher(user.ID(), updateCh, maxUpdateSize)
defer flusher.flush(ctx, true)
flushers[addrID] = flusher
}
// Build the messages and send them to the correct flusher.
if err := user.builder.Process(ctx, requests, func(req request, res *imap.MessageCreated, err error) error {
if err != nil {
return fmt.Errorf("failed to build message %s: %w", req.messageID, err)
// Create a reporter to report sync progress updates.
reporter := newReporter(user.ID(), user.eventCh, len(metadata), time.Second)
defer reporter.done()
// Send each update to the appropriate flusher.
for {
batch, err := buildCh.Next(ctx)
if errors.Is(err, stream.End) {
return nil
} else if err != nil {
return fmt.Errorf("failed to get next sync batch: %w", err)
}
flushers[req.addressID].push(res)
user.apiAddrs.Get(func(apiAddrs []liteapi.Address) {
for _, res := range batch {
if len(flushers) > 1 {
flushers[res.addressID].push(ctx, res.update)
} else {
flushers[apiAddrs[0].ID].push(ctx, res.update)
}
}
})
return nil
}); err != nil {
return fmt.Errorf("failed to build messages: %w", err)
}
for _, flusher := range flushers {
flusher.flush(ctx, true)
}
return nil
}
if err := user.vault.SetLastMessageID(batch[len(batch)-1].messageID); err != nil {
return fmt.Errorf("failed to set last synced message ID: %w", err)
}
func (user *User) syncWait() {
for _, updateCh := range user.updateCh {
waiter := imap.NewNoop()
defer waiter.Wait()
updateCh.Enqueue(waiter)
reporter.add(len(batch))
}
}