forked from Silverfish/proton-bridge
test: Force sync limits to minimum with env variable
Set `BRIDGE_SYNC_FORCE_MINIMUM_SPEC` as environment variable to force all the sync limits to minimum spec. This is enabled for windows builds.
This commit is contained in:
@ -50,6 +50,7 @@ stages:
|
|||||||
# ENV
|
# ENV
|
||||||
.env-windows:
|
.env-windows:
|
||||||
before_script:
|
before_script:
|
||||||
|
- export BRIDGE_SYNC_FORCE_MINIMUM_SPEC=1
|
||||||
- export GOROOT=/c/Go1.20/
|
- export GOROOT=/c/Go1.20/
|
||||||
- export PATH=$GOROOT/bin:$PATH
|
- export PATH=$GOROOT/bin:$PATH
|
||||||
- export GOARCH=amd64
|
- export GOARCH=amd64
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -243,6 +244,46 @@ func toMB(v uint64) float64 {
|
|||||||
return float64(v) / float64(Megabyte)
|
return float64(v) / float64(Megabyte)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type syncLimits struct {
|
||||||
|
MaxDownloadRequestMem uint64
|
||||||
|
MinDownloadRequestMem uint64
|
||||||
|
MaxMessageBuildingMem uint64
|
||||||
|
MinMessageBuildingMem uint64
|
||||||
|
MaxSyncMemory uint64
|
||||||
|
MaxParallelDownloads int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyncLimits(maxSyncMemory uint64) syncLimits {
|
||||||
|
limits := syncLimits{
|
||||||
|
// There's no point in using more than 128MB of download data per stage, after that we reach a point of diminishing
|
||||||
|
// returns as we can't keep the pipeline fed fast enough.
|
||||||
|
MaxDownloadRequestMem: 128 * Megabyte,
|
||||||
|
|
||||||
|
// Any lower than this and we may fail to download messages.
|
||||||
|
MinDownloadRequestMem: 40 * Megabyte,
|
||||||
|
|
||||||
|
// This value can be increased to your hearts content. The more system memory the user has, the more messages
|
||||||
|
// we can build in parallel.
|
||||||
|
MaxMessageBuildingMem: 128 * Megabyte,
|
||||||
|
MinMessageBuildingMem: 64 * Megabyte,
|
||||||
|
|
||||||
|
// Maximum recommend value for parallel downloads by the API team.
|
||||||
|
MaxParallelDownloads: 20,
|
||||||
|
|
||||||
|
MaxSyncMemory: maxSyncMemory,
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := os.LookupEnv("BRIDGE_SYNC_FORCE_MINIMUM_SPEC"); ok {
|
||||||
|
logrus.Warn("Sync specs forced to minimum")
|
||||||
|
limits.MaxDownloadRequestMem = 50 * Megabyte
|
||||||
|
limits.MaxMessageBuildingMem = 80 * Megabyte
|
||||||
|
limits.MaxParallelDownloads = 2
|
||||||
|
limits.MaxSyncMemory = 800 * Megabyte
|
||||||
|
}
|
||||||
|
|
||||||
|
return limits
|
||||||
|
}
|
||||||
|
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (user *User) syncMessages(
|
func (user *User) syncMessages(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
@ -255,7 +296,7 @@ func (user *User) syncMessages(
|
|||||||
addrKRs map[string]*crypto.KeyRing,
|
addrKRs map[string]*crypto.KeyRing,
|
||||||
updateCh map[string]*async.QueuedChannel[imap.Update],
|
updateCh map[string]*async.QueuedChannel[imap.Update],
|
||||||
eventCh *async.QueuedChannel[events.Event],
|
eventCh *async.QueuedChannel[events.Event],
|
||||||
maxSyncMemory uint64,
|
cfgMaxSyncMemory uint64,
|
||||||
) error {
|
) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -278,59 +319,51 @@ func (user *User) syncMessages(
|
|||||||
// Expected mem usage for this whole process should be the sum of MaxMessageBuildingMem and MaxDownloadRequestMem
|
// Expected mem usage for this whole process should be the sum of MaxMessageBuildingMem and MaxDownloadRequestMem
|
||||||
// times x due to pipeline and all additional memory used by network requests and compression+io.
|
// times x due to pipeline and all additional memory used by network requests and compression+io.
|
||||||
|
|
||||||
// There's no point in using more than 128MB of download data per stage, after that we reach a point of diminishing
|
|
||||||
// returns as we can't keep the pipeline fed fast enough.
|
|
||||||
const MaxDownloadRequestMem = 128 * Megabyte
|
|
||||||
|
|
||||||
// Any lower than this and we may fail to download messages.
|
|
||||||
const MinDownloadRequestMem = 40 * Megabyte
|
|
||||||
|
|
||||||
// This value can be increased to your hearts content. The more system memory the user has, the more messages
|
|
||||||
// we can build in parallel.
|
|
||||||
const MaxMessageBuildingMem = 128 * Megabyte
|
|
||||||
const MinMessageBuildingMem = 64 * Megabyte
|
|
||||||
|
|
||||||
// Maximum recommend value for parallel downloads by the API team.
|
|
||||||
const maxParallelDownloads = 20
|
|
||||||
|
|
||||||
totalMemory := memory.TotalMemory()
|
totalMemory := memory.TotalMemory()
|
||||||
|
|
||||||
if maxSyncMemory >= totalMemory/2 {
|
syncLimits := newSyncLimits(cfgMaxSyncMemory)
|
||||||
|
|
||||||
|
if syncLimits.MaxSyncMemory >= totalMemory/2 {
|
||||||
logrus.Warnf("Requested max sync memory of %v MB is greater than half of system memory (%v MB), forcing to half of system memory",
|
logrus.Warnf("Requested max sync memory of %v MB is greater than half of system memory (%v MB), forcing to half of system memory",
|
||||||
maxSyncMemory, toMB(totalMemory/2))
|
toMB(syncLimits.MaxSyncMemory), toMB(totalMemory/2))
|
||||||
maxSyncMemory = totalMemory / 2
|
syncLimits.MaxSyncMemory = totalMemory / 2
|
||||||
}
|
}
|
||||||
|
|
||||||
if maxSyncMemory < 800*Megabyte {
|
if syncLimits.MaxSyncMemory < 800*Megabyte {
|
||||||
logrus.Warnf("Requested max sync memory of %v MB, but minimum recommended is 800 MB, forcing max syncMemory to 800MB", toMB(maxSyncMemory))
|
logrus.Warnf("Requested max sync memory of %v MB, but minimum recommended is 800 MB, forcing max syncMemory to 800MB", toMB(syncLimits.MaxSyncMemory))
|
||||||
maxSyncMemory = 800 * Megabyte
|
syncLimits.MaxSyncMemory = 800 * Megabyte
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("Total System Memory: %v", toMB(totalMemory))
|
logrus.Debugf("Total System Memory: %v", toMB(totalMemory))
|
||||||
|
|
||||||
syncMaxDownloadRequestMem := MaxDownloadRequestMem
|
// Linter says it's not used. This is a lie.
|
||||||
syncMaxMessageBuildingMem := MaxMessageBuildingMem
|
//nolint: staticcheck
|
||||||
|
syncMaxDownloadRequestMem := syncLimits.MaxDownloadRequestMem
|
||||||
|
|
||||||
|
// Linter says it's not used. This is a lie.
|
||||||
|
//nolint: staticcheck
|
||||||
|
syncMaxMessageBuildingMem := syncLimits.MaxMessageBuildingMem
|
||||||
|
|
||||||
// If less than 2GB available try and limit max memory to 512 MB
|
// If less than 2GB available try and limit max memory to 512 MB
|
||||||
switch {
|
switch {
|
||||||
case maxSyncMemory < 2*Gigabyte:
|
case syncLimits.MaxSyncMemory < 2*Gigabyte:
|
||||||
if maxSyncMemory < 800*Megabyte {
|
if syncLimits.MaxSyncMemory < 800*Megabyte {
|
||||||
logrus.Warnf("System has less than 800MB of memory, you may experience issues sycing large mailboxes")
|
logrus.Warnf("System has less than 800MB of memory, you may experience issues sycing large mailboxes")
|
||||||
}
|
}
|
||||||
syncMaxDownloadRequestMem = MinDownloadRequestMem
|
syncMaxDownloadRequestMem = syncLimits.MinDownloadRequestMem
|
||||||
syncMaxMessageBuildingMem = MinMessageBuildingMem
|
syncMaxMessageBuildingMem = syncLimits.MinMessageBuildingMem
|
||||||
case maxSyncMemory == 2*Gigabyte:
|
case syncLimits.MaxSyncMemory == 2*Gigabyte:
|
||||||
// Increasing the max download capacity has very little effect on sync speed. We could increase the download
|
// Increasing the max download capacity has very little effect on sync speed. We could increase the download
|
||||||
// memory but the user would see less sync notifications. A smaller value here leads to more frequent
|
// memory but the user would see less sync notifications. A smaller value here leads to more frequent
|
||||||
// updates. Additionally, most of ot sync time is spent in the message building.
|
// updates. Additionally, most of ot sync time is spent in the message building.
|
||||||
syncMaxDownloadRequestMem = MaxDownloadRequestMem
|
syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem
|
||||||
// Currently limited so that if a user has multiple accounts active it also doesn't cause excessive memory usage.
|
// Currently limited so that if a user has multiple accounts active it also doesn't cause excessive memory usage.
|
||||||
syncMaxMessageBuildingMem = MaxMessageBuildingMem
|
syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem
|
||||||
default:
|
default:
|
||||||
// Divide by 8 as download stage and build stage will use aprox. 4x the specified memory.
|
// Divide by 8 as download stage and build stage will use aprox. 4x the specified memory.
|
||||||
remainingMemory := (maxSyncMemory - 2*Gigabyte) / 8
|
remainingMemory := (syncLimits.MaxSyncMemory - 2*Gigabyte) / 8
|
||||||
syncMaxDownloadRequestMem = MaxDownloadRequestMem + remainingMemory
|
syncMaxDownloadRequestMem = syncLimits.MaxDownloadRequestMem + remainingMemory
|
||||||
syncMaxMessageBuildingMem = MaxMessageBuildingMem + remainingMemory
|
syncMaxMessageBuildingMem = syncLimits.MaxMessageBuildingMem + remainingMemory
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("Max memory usage for sync Download=%vMB Building=%vMB Predicted Max Total=%vMB",
|
logrus.Debugf("Max memory usage for sync Download=%vMB Building=%vMB Predicted Max Total=%vMB",
|
||||||
@ -369,7 +402,7 @@ func (user *User) syncMessages(
|
|||||||
|
|
||||||
flushUpdateCh := make(chan flushUpdate)
|
flushUpdateCh := make(chan flushUpdate)
|
||||||
|
|
||||||
errorCh := make(chan error, maxParallelDownloads*4)
|
errorCh := make(chan error, syncLimits.MaxParallelDownloads*4)
|
||||||
|
|
||||||
// Go routine in charge of downloading message metadata
|
// Go routine in charge of downloading message metadata
|
||||||
async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) {
|
async.GoAnnotated(ctx, user.panicHandler, func(ctx context.Context) {
|
||||||
@ -443,7 +476,7 @@ func (user *User) syncMessages(
|
|||||||
logrus.Debugf("sync downloader exit")
|
logrus.Debugf("sync downloader exit")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
attachmentDownloader := user.newAttachmentDownloader(ctx, client, maxParallelDownloads)
|
attachmentDownloader := user.newAttachmentDownloader(ctx, client, syncLimits.MaxParallelDownloads)
|
||||||
defer attachmentDownloader.close()
|
defer attachmentDownloader.close()
|
||||||
|
|
||||||
for request := range downloadCh {
|
for request := range downloadCh {
|
||||||
@ -458,7 +491,7 @@ func (user *User) syncMessages(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := parallel.MapContext(ctx, maxParallelDownloads, request.ids, func(ctx context.Context, id string) (proton.FullMessage, error) {
|
result, err := parallel.MapContext(ctx, syncLimits.MaxParallelDownloads, request.ids, func(ctx context.Context, id string) (proton.FullMessage, error) {
|
||||||
defer async.HandlePanic(user.panicHandler)
|
defer async.HandlePanic(user.panicHandler)
|
||||||
|
|
||||||
var result proton.FullMessage
|
var result proton.FullMessage
|
||||||
|
|||||||
Reference in New Issue
Block a user