forked from Silverfish/proton-bridge
fix(GODT-2822): Handle 429 during message download
When we run into 429 during a message download, do not cancel the whole batch and switch to a sequential downloader to avoid API overload.
This commit is contained in:
@ -378,32 +378,18 @@ func (user *User) syncMessages(
|
||||
batchLen int
|
||||
}
|
||||
|
||||
type downloadRequest struct {
|
||||
ids []string
|
||||
expectedSize uint64
|
||||
err error
|
||||
}
|
||||
|
||||
type downloadedMessageBatch struct {
|
||||
batch []proton.FullMessage
|
||||
}
|
||||
|
||||
type builtMessageBatch struct {
|
||||
batch []*buildRes
|
||||
}
|
||||
|
||||
downloadCh := make(chan downloadRequest)
|
||||
|
||||
buildCh := make(chan downloadedMessageBatch)
|
||||
|
||||
// The higher this value, the longer we can continue our download iteration before being blocked on channel writes
|
||||
// to the update flushing goroutine.
|
||||
flushCh := make(chan builtMessageBatch)
|
||||
|
||||
flushUpdateCh := make(chan flushUpdate)
|
||||
|
||||
errorCh := make(chan error, syncLimits.MaxParallelDownloads*4)
|
||||
|
||||
// Go routine in charge of downloading message metadata
|
||||
async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) {
|
||||
defer close(downloadCh)
|
||||
@ -469,65 +455,7 @@ func (user *User) syncMessages(
|
||||
}, logging.Labels{"sync-stage": "meta-data"})
|
||||
|
||||
// Goroutine in charge of downloading and building messages in maxBatchSize batches.
|
||||
async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) {
|
||||
defer close(buildCh)
|
||||
defer close(errorCh)
|
||||
defer func() {
|
||||
logrus.Debugf("sync downloader exit")
|
||||
}()
|
||||
|
||||
attachmentDownloader := user.newAttachmentDownloader(ctx, client, syncLimits.MaxParallelDownloads)
|
||||
defer attachmentDownloader.close()
|
||||
|
||||
for request := range downloadCh {
|
||||
logrus.Debugf("Download request: %v MB:%v", len(request.ids), toMB(request.expectedSize))
|
||||
if request.err != nil {
|
||||
errorCh <- request.err
|
||||
return
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
errorCh <- ctx.Err()
|
||||
return
|
||||
}
|
||||
|
||||
result, err := parallel.MapContext(ctx, syncLimits.MaxParallelDownloads, request.ids, func(ctx context.Context, id string) (proton.FullMessage, error) {
|
||||
defer async.HandlePanic(user.panicHandler)
|
||||
|
||||
var result proton.FullMessage
|
||||
|
||||
msg, err := client.GetMessage(ctx, id)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message")
|
||||
return proton.FullMessage{}, err
|
||||
}
|
||||
|
||||
attachments, err := attachmentDownloader.getAttachments(ctx, msg.Attachments)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("msgID", msg.ID).Error("Failed to download message attachments")
|
||||
return proton.FullMessage{}, err
|
||||
}
|
||||
|
||||
result.Message = msg
|
||||
result.AttData = attachments
|
||||
|
||||
return result, nil
|
||||
})
|
||||
if err != nil {
|
||||
errorCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case buildCh <- downloadedMessageBatch{
|
||||
batch: result,
|
||||
}:
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}, logging.Labels{"sync-stage": "download"})
|
||||
buildCh, errorCh := startSyncDownloader(ctx, user.panicHandler, user.client, downloadCh, syncLimits)
|
||||
|
||||
// Goroutine which builds messages after they have been downloaded
|
||||
async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) {
|
||||
@ -793,93 +721,6 @@ func wantLabels(apiLabels map[string]proton.Label, labelIDs []string) []string {
|
||||
})
|
||||
}
|
||||
|
||||
type attachmentResult struct {
|
||||
attachment []byte
|
||||
err error
|
||||
}
|
||||
|
||||
type attachmentJob struct {
|
||||
id string
|
||||
size int64
|
||||
result chan attachmentResult
|
||||
}
|
||||
|
||||
type attachmentDownloader struct {
|
||||
workerCh chan attachmentJob
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func attachmentWorker(ctx context.Context, client *proton.Client, work <-chan attachmentJob) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case job, ok := <-work:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
var b bytes.Buffer
|
||||
b.Grow(int(job.size))
|
||||
err := client.GetAttachmentInto(ctx, job.id, &b)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(job.result)
|
||||
return
|
||||
case job.result <- attachmentResult{attachment: b.Bytes(), err: err}:
|
||||
close(job.result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (user *User) newAttachmentDownloader(ctx context.Context, client *proton.Client, workerCount int) *attachmentDownloader {
|
||||
workerCh := make(chan attachmentJob, (workerCount+2)*workerCount)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
for i := 0; i < workerCount; i++ {
|
||||
workerCh = make(chan attachmentJob)
|
||||
async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) { attachmentWorker(ctx, client, workerCh) }, logging.Labels{
|
||||
"sync": fmt.Sprintf("att-downloader %v", i),
|
||||
})
|
||||
}
|
||||
|
||||
return &attachmentDownloader{
|
||||
workerCh: workerCh,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *attachmentDownloader) getAttachments(ctx context.Context, attachments []proton.Attachment) ([][]byte, error) {
|
||||
resultChs := make([]chan attachmentResult, len(attachments))
|
||||
for i, id := range attachments {
|
||||
resultChs[i] = make(chan attachmentResult, 1)
|
||||
select {
|
||||
case a.workerCh <- attachmentJob{id: id.ID, result: resultChs[i], size: id.Size}:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
result := make([][]byte, len(attachments))
|
||||
var err error
|
||||
for i := 0; i < len(attachments); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case r := <-resultChs[i]:
|
||||
if r.err != nil {
|
||||
err = fmt.Errorf("failed to get attachment %v: %w", attachments[i], r.err)
|
||||
}
|
||||
result[i] = r.attachment
|
||||
}
|
||||
}
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (a *attachmentDownloader) close() {
|
||||
a.cancel()
|
||||
}
|
||||
|
||||
func chunkSyncBuilderBatch(batch []proton.FullMessage, maxMemory uint64) [][]proton.FullMessage {
|
||||
var expectedMemUsage uint64
|
||||
var chunks [][]proton.FullMessage
|
||||
|
||||
Reference in New Issue
Block a user