forked from Silverfish/proton-bridge
GODT-2224: Allow the user to specify max sync memory usage in Vault
This commit is contained in:
@ -462,6 +462,7 @@ func (bridge *Bridge) addUserWithVault(
|
|||||||
apiUser,
|
apiUser,
|
||||||
bridge.crashHandler,
|
bridge.crashHandler,
|
||||||
bridge.vault.GetShowAllMail(),
|
bridge.vault.GetShowAllMail(),
|
||||||
|
bridge.vault.GetMaxSyncMemory(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create user: %w", err)
|
return fmt.Errorf("failed to create user: %w", err)
|
||||||
|
|||||||
@ -141,6 +141,7 @@ func (user *User) sync(ctx context.Context) error {
|
|||||||
addrKRs,
|
addrKRs,
|
||||||
user.updateCh,
|
user.updateCh,
|
||||||
user.eventCh,
|
user.eventCh,
|
||||||
|
user.maxSyncMemory,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return fmt.Errorf("failed to sync messages: %w", err)
|
return fmt.Errorf("failed to sync messages: %w", err)
|
||||||
}
|
}
|
||||||
@ -229,6 +230,7 @@ func syncMessages(
|
|||||||
addrKRs map[string]*crypto.KeyRing,
|
addrKRs map[string]*crypto.KeyRing,
|
||||||
updateCh map[string]*queue.QueuedChannel[imap.Update],
|
updateCh map[string]*queue.QueuedChannel[imap.Update],
|
||||||
eventCh *queue.QueuedChannel[events.Event],
|
eventCh *queue.QueuedChannel[events.Event],
|
||||||
|
maxSyncMemory uint64,
|
||||||
) error {
|
) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -267,25 +269,43 @@ func syncMessages(
|
|||||||
const maxParallelDownloads = 20
|
const maxParallelDownloads = 20
|
||||||
|
|
||||||
totalMemory := memory.TotalMemory()
|
totalMemory := memory.TotalMemory()
|
||||||
|
|
||||||
|
if maxSyncMemory >= totalMemory/2 {
|
||||||
|
logrus.Warnf("Requested max sync memory of %v MB is greater than half of system memory (%v MB), forcing to half of system memory",
|
||||||
|
maxSyncMemory, toMB(totalMemory/2))
|
||||||
|
maxSyncMemory = totalMemory / 2
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxSyncMemory < 800*Megabyte {
|
||||||
|
logrus.Warnf("Requested max sync memory of %v MB, but minimum recommended is 800 MB, forcing max syncMemory to 800MB", toMB(maxSyncMemory))
|
||||||
|
maxSyncMemory = 800 * Megabyte
|
||||||
|
}
|
||||||
|
|
||||||
logrus.Debugf("Total System Memory: %v", toMB(totalMemory))
|
logrus.Debugf("Total System Memory: %v", toMB(totalMemory))
|
||||||
|
|
||||||
syncMaxDownloadRequestMem := MaxDownloadRequestMem
|
syncMaxDownloadRequestMem := MaxDownloadRequestMem
|
||||||
syncMaxMessageBuildingMem := MaxMessageBuildingMem
|
syncMaxMessageBuildingMem := MaxMessageBuildingMem
|
||||||
|
|
||||||
// If less than 2GB available try and limit max memory to 512 MB
|
// If less than 2GB available try and limit max memory to 512 MB
|
||||||
if totalMemory < 2*Gigabyte {
|
switch {
|
||||||
if totalMemory < 800*Megabyte {
|
case maxSyncMemory < 2*Gigabyte:
|
||||||
|
if maxSyncMemory < 800*Megabyte {
|
||||||
logrus.Warnf("System has less than 800MB of memory, you may experience issues sycing large mailboxes")
|
logrus.Warnf("System has less than 800MB of memory, you may experience issues sycing large mailboxes")
|
||||||
}
|
}
|
||||||
syncMaxDownloadRequestMem = MinDownloadRequestMem
|
syncMaxDownloadRequestMem = MinDownloadRequestMem
|
||||||
syncMaxMessageBuildingMem = MinMessageBuildingMem
|
syncMaxMessageBuildingMem = MinMessageBuildingMem
|
||||||
} else {
|
case maxSyncMemory == 2*Gigabyte:
|
||||||
// Increasing the max download capacity has very little effect on sync speed. We could increase the download
|
// Increasing the max download capacity has very little effect on sync speed. We could increase the download
|
||||||
// memory but the user would see less sync notifications. A smaller value here leads to more frequent
|
// memory but the user would see less sync notifications. A smaller value here leads to more frequent
|
||||||
// updates. Additionally, most of ot sync time is spent in the message building.
|
// updates. Additionally, most of ot sync time is spent in the message building.
|
||||||
syncMaxDownloadRequestMem = MaxDownloadRequestMem
|
syncMaxDownloadRequestMem = MaxDownloadRequestMem
|
||||||
// Currently limited so that if a user has multiple accounts active it also doesn't cause excessive memory usage.
|
// Currently limited so that if a user has multiple accounts active it also doesn't cause excessive memory usage.
|
||||||
syncMaxMessageBuildingMem = MaxMessageBuildingMem
|
syncMaxMessageBuildingMem = MaxMessageBuildingMem
|
||||||
|
default:
|
||||||
|
// Divide by 8 as download stage and build stage will use aprox. 4x the specified memory.
|
||||||
|
remainingMemory := (maxSyncMemory - 2*Gigabyte) / 8
|
||||||
|
syncMaxDownloadRequestMem = MaxDownloadRequestMem + remainingMemory
|
||||||
|
syncMaxMessageBuildingMem = MaxMessageBuildingMem + remainingMemory
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("Max memory usage for sync Download=%vMB Building=%vMB Predicted Max Total=%vMB",
|
logrus.Debugf("Max memory usage for sync Download=%vMB Building=%vMB Predicted Max Total=%vMB",
|
||||||
@ -504,6 +524,8 @@ func syncMessages(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for index, chunk := range chunks {
|
for index, chunk := range chunks {
|
||||||
|
logrus.Debugf("Build request: %v of %v count=%v", index, len(chunks), len(chunk))
|
||||||
|
|
||||||
result, err := parallel.MapContext(ctx, maxMessagesInParallel, chunk, func(ctx context.Context, msg proton.FullMessage) (*buildRes, error) {
|
result, err := parallel.MapContext(ctx, maxMessagesInParallel, chunk, func(ctx context.Context, msg proton.FullMessage) (*buildRes, error) {
|
||||||
return buildRFC822(apiLabels, msg, addrKRs[msg.AddressID], new(bytes.Buffer)), nil
|
return buildRFC822(apiLabels, msg, addrKRs[msg.AddressID], new(bytes.Buffer)), nil
|
||||||
})
|
})
|
||||||
@ -511,8 +533,6 @@ func syncMessages(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("Build request: %v of %v", index, len(chunks))
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case flushCh <- builtMessageBatch{result}:
|
case flushCh <- builtMessageBatch{result}:
|
||||||
|
|
||||||
|
|||||||
@ -85,6 +85,8 @@ type User struct {
|
|||||||
goPollAPIEvents func(wait bool)
|
goPollAPIEvents func(wait bool)
|
||||||
|
|
||||||
showAllMail uint32
|
showAllMail uint32
|
||||||
|
|
||||||
|
maxSyncMemory uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new user.
|
// New returns a new user.
|
||||||
@ -98,6 +100,7 @@ func New(
|
|||||||
apiUser proton.User,
|
apiUser proton.User,
|
||||||
crashHandler async.PanicHandler,
|
crashHandler async.PanicHandler,
|
||||||
showAllMail bool,
|
showAllMail bool,
|
||||||
|
maxSyncMemory uint64,
|
||||||
) (*User, error) { //nolint:funlen
|
) (*User, error) { //nolint:funlen
|
||||||
logrus.WithField("userID", apiUser.ID).Info("Creating new user")
|
logrus.WithField("userID", apiUser.ID).Info("Creating new user")
|
||||||
|
|
||||||
@ -141,6 +144,8 @@ func New(
|
|||||||
pollAPIEventsCh: make(chan chan struct{}),
|
pollAPIEventsCh: make(chan chan struct{}),
|
||||||
|
|
||||||
showAllMail: b32(showAllMail),
|
showAllMail: b32(showAllMail),
|
||||||
|
|
||||||
|
maxSyncMemory: maxSyncMemory,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize the user's update channels for its current address mode.
|
// Initialize the user's update channels for its current address mode.
|
||||||
|
|||||||
@ -209,14 +209,14 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma
|
|||||||
saltedKeyPass, err := salts.SaltForKey([]byte(password), apiUser.Keys.Primary().ID)
|
saltedKeyPass, err := salts.SaltForKey([]byte(password), apiUser.Keys.Primary().ID)
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
|
|
||||||
vault, corrupt, err := vault.New(tb.TempDir(), tb.TempDir(), []byte("my secret key"))
|
v, corrupt, err := vault.New(tb.TempDir(), tb.TempDir(), []byte("my secret key"))
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
require.False(tb, corrupt)
|
require.False(tb, corrupt)
|
||||||
|
|
||||||
vaultUser, err := vault.AddUser(apiUser.ID, username, username+"@pm.me", apiAuth.UID, apiAuth.RefreshToken, saltedKeyPass)
|
vaultUser, err := v.AddUser(apiUser.ID, username, username+"@pm.me", apiAuth.UID, apiAuth.RefreshToken, saltedKeyPass)
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
|
|
||||||
user, err := New(ctx, vaultUser, client, nil, apiUser, nil, true)
|
user, err := New(ctx, vaultUser, client, nil, apiUser, nil, true, vault.DefaultMaxSyncMemory)
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
defer user.Close()
|
defer user.Close()
|
||||||
|
|
||||||
|
|||||||
@ -178,7 +178,7 @@ func (vault *Vault) SetLastVersion(version *semver.Version) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFirstStart sets whether this is the first time the bridge has been started.
|
// GetFirstStart returns whether this is the first time the bridge has been started.
|
||||||
func (vault *Vault) GetFirstStart() bool {
|
func (vault *Vault) GetFirstStart() bool {
|
||||||
return vault.get().Settings.FirstStart
|
return vault.get().Settings.FirstStart
|
||||||
}
|
}
|
||||||
@ -189,3 +189,21 @@ func (vault *Vault) SetFirstStart(firstStart bool) error {
|
|||||||
data.Settings.FirstStart = firstStart
|
data.Settings.FirstStart = firstStart
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetMaxSyncMemory returns the maximum amount of memory the sync process should use.
|
||||||
|
func (vault *Vault) GetMaxSyncMemory() uint64 {
|
||||||
|
v := vault.get().Settings.MaxSyncMemory
|
||||||
|
// can be zero if never written to vault before.
|
||||||
|
if v == 0 {
|
||||||
|
return DefaultMaxSyncMemory
|
||||||
|
}
|
||||||
|
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetMaxSyncMemory sets the maximum amount of memory the sync process should use.
|
||||||
|
func (vault *Vault) SetMaxSyncMemory(maxMemory uint64) error {
|
||||||
|
return vault.mod(func(data *Data) {
|
||||||
|
data.Settings.MaxSyncMemory = maxMemory
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@ -202,3 +202,11 @@ func TestVault_Settings_FirstStart(t *testing.T) {
|
|||||||
// Check the new first start value.
|
// Check the new first start value.
|
||||||
require.Equal(t, false, s.GetFirstStart())
|
require.Equal(t, false, s.GetFirstStart())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestVault_Settings_MaxSyncMemory(t *testing.T) {
|
||||||
|
// create a new test vault.
|
||||||
|
s := newVault(t)
|
||||||
|
|
||||||
|
// Check the default first start value.
|
||||||
|
require.Equal(t, vault.DefaultMaxSyncMemory, s.GetMaxSyncMemory())
|
||||||
|
}
|
||||||
|
|||||||
@ -42,8 +42,12 @@ type Settings struct {
|
|||||||
|
|
||||||
LastVersion string
|
LastVersion string
|
||||||
FirstStart bool
|
FirstStart bool
|
||||||
|
|
||||||
|
MaxSyncMemory uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const DefaultMaxSyncMemory = 2 * 1024 * uint64(1024*1024)
|
||||||
|
|
||||||
func newDefaultSettings(gluonDir string) Settings {
|
func newDefaultSettings(gluonDir string) Settings {
|
||||||
return Settings{
|
return Settings{
|
||||||
GluonDir: gluonDir,
|
GluonDir: gluonDir,
|
||||||
@ -64,5 +68,7 @@ func newDefaultSettings(gluonDir string) Settings {
|
|||||||
|
|
||||||
LastVersion: "0.0.0",
|
LastVersion: "0.0.0",
|
||||||
FirstStart: true,
|
FirstStart: true,
|
||||||
|
|
||||||
|
MaxSyncMemory: DefaultMaxSyncMemory,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user