diff --git a/internal/bridge/user.go b/internal/bridge/user.go index fa86a313..6d355d2a 100644 --- a/internal/bridge/user.go +++ b/internal/bridge/user.go @@ -462,6 +462,7 @@ func (bridge *Bridge) addUserWithVault( apiUser, bridge.crashHandler, bridge.vault.GetShowAllMail(), + bridge.vault.GetMaxSyncMemory(), ) if err != nil { return fmt.Errorf("failed to create user: %w", err) diff --git a/internal/user/sync.go b/internal/user/sync.go index 0735ed83..ccf7b06b 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -141,6 +141,7 @@ func (user *User) sync(ctx context.Context) error { addrKRs, user.updateCh, user.eventCh, + user.maxSyncMemory, ); err != nil { return fmt.Errorf("failed to sync messages: %w", err) } @@ -229,6 +230,7 @@ func syncMessages( addrKRs map[string]*crypto.KeyRing, updateCh map[string]*queue.QueuedChannel[imap.Update], eventCh *queue.QueuedChannel[events.Event], + maxSyncMemory uint64, ) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -267,25 +269,43 @@ func syncMessages( const maxParallelDownloads = 20 totalMemory := memory.TotalMemory() + + if maxSyncMemory >= totalMemory/2 { + logrus.Warnf("Requested max sync memory of %v MB is greater than half of system memory (%v MB), forcing to half of system memory", + maxSyncMemory, toMB(totalMemory/2)) + maxSyncMemory = totalMemory / 2 + } + + if maxSyncMemory < 800*Megabyte { + logrus.Warnf("Requested max sync memory of %v MB, but minimum recommended is 800 MB, forcing max syncMemory to 800MB", toMB(maxSyncMemory)) + maxSyncMemory = 800 * Megabyte + } + logrus.Debugf("Total System Memory: %v", toMB(totalMemory)) syncMaxDownloadRequestMem := MaxDownloadRequestMem syncMaxMessageBuildingMem := MaxMessageBuildingMem // If less than 2GB available try and limit max memory to 512 MB - if totalMemory < 2*Gigabyte { - if totalMemory < 800*Megabyte { + switch { + case maxSyncMemory < 2*Gigabyte: + if maxSyncMemory < 800*Megabyte { logrus.Warnf("System has less than 800MB of memory, you may experience issues sycing large mailboxes") } syncMaxDownloadRequestMem = MinDownloadRequestMem syncMaxMessageBuildingMem = MinMessageBuildingMem - } else { + case maxSyncMemory == 2*Gigabyte: // Increasing the max download capacity has very little effect on sync speed. We could increase the download // memory but the user would see less sync notifications. A smaller value here leads to more frequent // updates. Additionally, most of ot sync time is spent in the message building. syncMaxDownloadRequestMem = MaxDownloadRequestMem // Currently limited so that if a user has multiple accounts active it also doesn't cause excessive memory usage. syncMaxMessageBuildingMem = MaxMessageBuildingMem + default: + // Divide by 8 as download stage and build stage will use aprox. 4x the specified memory. + remainingMemory := (maxSyncMemory - 2*Gigabyte) / 8 + syncMaxDownloadRequestMem = MaxDownloadRequestMem + remainingMemory + syncMaxMessageBuildingMem = MaxMessageBuildingMem + remainingMemory } logrus.Debugf("Max memory usage for sync Download=%vMB Building=%vMB Predicted Max Total=%vMB", @@ -504,6 +524,8 @@ func syncMessages( } for index, chunk := range chunks { + logrus.Debugf("Build request: %v of %v count=%v", index, len(chunks), len(chunk)) + result, err := parallel.MapContext(ctx, maxMessagesInParallel, chunk, func(ctx context.Context, msg proton.FullMessage) (*buildRes, error) { return buildRFC822(apiLabels, msg, addrKRs[msg.AddressID], new(bytes.Buffer)), nil }) @@ -511,8 +533,6 @@ func syncMessages( return } - logrus.Debugf("Build request: %v of %v", index, len(chunks)) - select { case flushCh <- builtMessageBatch{result}: diff --git a/internal/user/user.go b/internal/user/user.go index abdef220..f1c4b9fd 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -85,6 +85,8 @@ type User struct { goPollAPIEvents func(wait bool) showAllMail uint32 + + maxSyncMemory uint64 } // New returns a new user. @@ -98,6 +100,7 @@ func New( apiUser proton.User, crashHandler async.PanicHandler, showAllMail bool, + maxSyncMemory uint64, ) (*User, error) { //nolint:funlen logrus.WithField("userID", apiUser.ID).Info("Creating new user") @@ -141,6 +144,8 @@ func New( pollAPIEventsCh: make(chan chan struct{}), showAllMail: b32(showAllMail), + + maxSyncMemory: maxSyncMemory, } // Initialize the user's update channels for its current address mode. diff --git a/internal/user/user_test.go b/internal/user/user_test.go index 13efde1c..8542bac2 100644 --- a/internal/user/user_test.go +++ b/internal/user/user_test.go @@ -209,14 +209,14 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *proton.Ma saltedKeyPass, err := salts.SaltForKey([]byte(password), apiUser.Keys.Primary().ID) require.NoError(tb, err) - vault, corrupt, err := vault.New(tb.TempDir(), tb.TempDir(), []byte("my secret key")) + v, corrupt, err := vault.New(tb.TempDir(), tb.TempDir(), []byte("my secret key")) require.NoError(tb, err) require.False(tb, corrupt) - vaultUser, err := vault.AddUser(apiUser.ID, username, username+"@pm.me", apiAuth.UID, apiAuth.RefreshToken, saltedKeyPass) + vaultUser, err := v.AddUser(apiUser.ID, username, username+"@pm.me", apiAuth.UID, apiAuth.RefreshToken, saltedKeyPass) require.NoError(tb, err) - user, err := New(ctx, vaultUser, client, nil, apiUser, nil, true) + user, err := New(ctx, vaultUser, client, nil, apiUser, nil, true, vault.DefaultMaxSyncMemory) require.NoError(tb, err) defer user.Close() diff --git a/internal/vault/settings.go b/internal/vault/settings.go index 904b0374..b452c44f 100644 --- a/internal/vault/settings.go +++ b/internal/vault/settings.go @@ -178,7 +178,7 @@ func (vault *Vault) SetLastVersion(version *semver.Version) error { }) } -// GetFirstStart sets whether this is the first time the bridge has been started. +// GetFirstStart returns whether this is the first time the bridge has been started. func (vault *Vault) GetFirstStart() bool { return vault.get().Settings.FirstStart } @@ -189,3 +189,21 @@ func (vault *Vault) SetFirstStart(firstStart bool) error { data.Settings.FirstStart = firstStart }) } + +// GetMaxSyncMemory returns the maximum amount of memory the sync process should use. +func (vault *Vault) GetMaxSyncMemory() uint64 { + v := vault.get().Settings.MaxSyncMemory + // can be zero if never written to vault before. + if v == 0 { + return DefaultMaxSyncMemory + } + + return v +} + +// SetMaxSyncMemory sets the maximum amount of memory the sync process should use. +func (vault *Vault) SetMaxSyncMemory(maxMemory uint64) error { + return vault.mod(func(data *Data) { + data.Settings.MaxSyncMemory = maxMemory + }) +} diff --git a/internal/vault/settings_test.go b/internal/vault/settings_test.go index 4b90b3ba..fa099e09 100644 --- a/internal/vault/settings_test.go +++ b/internal/vault/settings_test.go @@ -202,3 +202,11 @@ func TestVault_Settings_FirstStart(t *testing.T) { // Check the new first start value. require.Equal(t, false, s.GetFirstStart()) } + +func TestVault_Settings_MaxSyncMemory(t *testing.T) { + // create a new test vault. + s := newVault(t) + + // Check the default first start value. + require.Equal(t, vault.DefaultMaxSyncMemory, s.GetMaxSyncMemory()) +} diff --git a/internal/vault/types_settings.go b/internal/vault/types_settings.go index 4ba00f7e..bb6946a3 100644 --- a/internal/vault/types_settings.go +++ b/internal/vault/types_settings.go @@ -42,8 +42,12 @@ type Settings struct { LastVersion string FirstStart bool + + MaxSyncMemory uint64 } +const DefaultMaxSyncMemory = 2 * 1024 * uint64(1024*1024) + func newDefaultSettings(gluonDir string) Settings { return Settings{ GluonDir: gluonDir, @@ -64,5 +68,7 @@ func newDefaultSettings(gluonDir string) Settings { LastVersion: "0.0.0", FirstStart: true, + + MaxSyncMemory: DefaultMaxSyncMemory, } }