Files
proton-bridge/internal/user/sync_flusher.go
Leander Beernaert a2ab5df7ce GODT-2085: Revise sync algorithm
Revise syncing work distribution. Sync time can be reduced by up to 50%.

Rework the sync so that it pipelines better with bigger batch counts at
each stage. We now use 3 separate stages: Download, Updates and Sync.

The Download stage downloads messages in maxBatchSize intervals using
1.5x syncWorkers. Once the current batch has finished downloading it's
forwarded to the Updates stage and we proceed to download the next
batch.

The Update stage converts everything into gluon updates and prepares a
collection of noops that the sync stage can wait on for termination.

Finally the sync stage waits until the updates have been applied in
Gluon so that the vault information can be updated. We allow up to 4
pending wait operations to be queued currently to not block the
pipeline.
2022-11-22 12:32:47 +00:00

55 lines
1.5 KiB
Go

// Copyright (c) 2022 Proton AG
//
// This file is part of Proton Mail Bridge.
//
// Proton Mail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Proton Mail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Proton Mail Bridge. If not, see <https://www.gnu.org/licenses/>.
package user
import (
"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/queue"
)
type flusher struct {
updateCh *queue.QueuedChannel[imap.Update]
updates []*imap.MessageCreated
maxUpdateSize int
curChunkSize int
}
func newFlusher(updateCh *queue.QueuedChannel[imap.Update], maxUpdateSize int) *flusher {
return &flusher{
updateCh: updateCh,
maxUpdateSize: maxUpdateSize,
}
}
func (f *flusher) push(update *imap.MessageCreated) {
f.updates = append(f.updates, update)
if f.curChunkSize += len(update.Literal); f.curChunkSize >= f.maxUpdateSize {
f.flush()
}
}
func (f *flusher) flush() {
if len(f.updates) > 0 {
f.updateCh.Enqueue(imap.NewMessagesCreated(f.updates...))
f.updates = nil
f.curChunkSize = 0
}
}