Other: Read sync workers setting from vault
This commit is contained in:
@ -384,7 +384,15 @@ func (bridge *Bridge) addUserWithVault(
|
|||||||
apiUser liteapi.User,
|
apiUser liteapi.User,
|
||||||
vaultUser *vault.User,
|
vaultUser *vault.User,
|
||||||
) error {
|
) error {
|
||||||
user, err := user.New(ctx, vaultUser, client, apiUser, bridge.GetShowAllMail())
|
user, err := user.New(
|
||||||
|
ctx,
|
||||||
|
vaultUser,
|
||||||
|
client,
|
||||||
|
apiUser,
|
||||||
|
bridge.vault.SyncWorkers(),
|
||||||
|
bridge.vault.SyncBuffer(),
|
||||||
|
bridge.vault.GetShowAllMail(),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create user: %w", err)
|
return fmt.Errorf("failed to create user: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,7 +21,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -95,7 +94,17 @@ func (user *User) sync(ctx context.Context) error {
|
|||||||
if !user.vault.SyncStatus().HasMessages {
|
if !user.vault.SyncStatus().HasMessages {
|
||||||
user.log.Debug("Syncing messages")
|
user.log.Debug("Syncing messages")
|
||||||
|
|
||||||
if err := syncMessages(ctx, user.ID(), user.client, user.vault, addrKRs, user.updateCh, user.eventCh); err != nil {
|
if err := syncMessages(
|
||||||
|
ctx,
|
||||||
|
user.ID(),
|
||||||
|
user.client,
|
||||||
|
user.vault,
|
||||||
|
addrKRs,
|
||||||
|
user.updateCh,
|
||||||
|
user.eventCh,
|
||||||
|
user.syncWorkers,
|
||||||
|
user.syncBuffer,
|
||||||
|
); err != nil {
|
||||||
return fmt.Errorf("failed to sync messages: %w", err)
|
return fmt.Errorf("failed to sync messages: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -176,6 +185,7 @@ func syncMessages( //nolint:funlen
|
|||||||
addrKRs map[string]*crypto.KeyRing,
|
addrKRs map[string]*crypto.KeyRing,
|
||||||
updateCh map[string]*queue.QueuedChannel[imap.Update],
|
updateCh map[string]*queue.QueuedChannel[imap.Update],
|
||||||
eventCh *queue.QueuedChannel[events.Event],
|
eventCh *queue.QueuedChannel[events.Event],
|
||||||
|
syncWorkers, syncBuffer int,
|
||||||
) error {
|
) error {
|
||||||
// Determine which messages to sync.
|
// Determine which messages to sync.
|
||||||
metadata, err := client.GetAllMessageMetadata(ctx, nil)
|
metadata, err := client.GetAllMessageMetadata(ctx, nil)
|
||||||
@ -195,7 +205,7 @@ func syncMessages( //nolint:funlen
|
|||||||
|
|
||||||
// Fetch and build each message.
|
// Fetch and build each message.
|
||||||
buildCh := stream.Map(
|
buildCh := stream.Map(
|
||||||
client.GetFullMessages(ctx, runtime.NumCPU(), runtime.NumCPU(), messageIDs...),
|
client.GetFullMessages(ctx, syncWorkers, syncBuffer, messageIDs...),
|
||||||
func(_ context.Context, full liteapi.FullMessage) (*buildRes, error) {
|
func(_ context.Context, full liteapi.FullMessage) (*buildRes, error) {
|
||||||
return buildRFC822(full, addrKRs[full.AddressID])
|
return buildRFC822(full, addrKRs[full.AddressID])
|
||||||
},
|
},
|
||||||
|
|||||||
@ -74,6 +74,8 @@ type User struct {
|
|||||||
abortable async.Abortable
|
abortable async.Abortable
|
||||||
goSync func()
|
goSync func()
|
||||||
|
|
||||||
|
syncWorkers int
|
||||||
|
syncBuffer int
|
||||||
showAllMail uint32
|
showAllMail uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,6 +87,7 @@ func New(
|
|||||||
encVault *vault.User,
|
encVault *vault.User,
|
||||||
client *liteapi.Client,
|
client *liteapi.Client,
|
||||||
apiUser liteapi.User,
|
apiUser liteapi.User,
|
||||||
|
syncWorkers, syncBuffer int,
|
||||||
showAllMail bool,
|
showAllMail bool,
|
||||||
) (*User, error) { //nolint:funlen
|
) (*User, error) { //nolint:funlen
|
||||||
logrus.WithField("userID", apiUser.ID).Debug("Creating new user")
|
logrus.WithField("userID", apiUser.ID).Debug("Creating new user")
|
||||||
@ -158,6 +161,8 @@ func New(
|
|||||||
|
|
||||||
tasks: xsync.NewGroup(context.Background()),
|
tasks: xsync.NewGroup(context.Background()),
|
||||||
|
|
||||||
|
syncWorkers: syncWorkers,
|
||||||
|
syncBuffer: syncBuffer,
|
||||||
showAllMail: b32(showAllMail),
|
showAllMail: b32(showAllMail),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -182,7 +182,7 @@ func withUser(tb testing.TB, ctx context.Context, _ *server.Server, m *liteapi.M
|
|||||||
vaultUser, err := vault.AddUser(apiUser.ID, username, apiAuth.UID, apiAuth.RefreshToken, saltedKeyPass)
|
vaultUser, err := vault.AddUser(apiUser.ID, username, apiAuth.UID, apiAuth.RefreshToken, saltedKeyPass)
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
|
|
||||||
user, err := New(ctx, vaultUser, client, apiUser, true)
|
user, err := New(ctx, vaultUser, client, apiUser, vault.SyncWorkers(), vault.SyncBuffer(), true)
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
defer user.Close()
|
defer user.Close()
|
||||||
|
|
||||||
|
|||||||
@ -201,3 +201,13 @@ func (vault *Vault) SetFirstStartGUI(firstStartGUI bool) error {
|
|||||||
data.Settings.FirstStartGUI = firstStartGUI
|
data.Settings.FirstStartGUI = firstStartGUI
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SyncWorkers returns the number of workers to use for syncing.
|
||||||
|
func (vault *Vault) SyncWorkers() int {
|
||||||
|
return vault.get().Settings.SyncWorkers
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncBuffer returns the number of buffer workers to use for syncing.
|
||||||
|
func (vault *Vault) SyncBuffer() int {
|
||||||
|
return vault.get().Settings.SyncBuffer
|
||||||
|
}
|
||||||
|
|||||||
@ -18,6 +18,7 @@
|
|||||||
package vault_test
|
package vault_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/Masterminds/semver/v3"
|
"github.com/Masterminds/semver/v3"
|
||||||
@ -216,3 +217,11 @@ func TestVault_Settings_FirstStartGUI(t *testing.T) {
|
|||||||
// Check the new first start value.
|
// Check the new first start value.
|
||||||
require.Equal(t, false, s.GetFirstStartGUI())
|
require.Equal(t, false, s.GetFirstStartGUI())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestVault_Settings_SyncWorkers(t *testing.T) {
|
||||||
|
// create a new test vault.
|
||||||
|
s := newVault(t)
|
||||||
|
|
||||||
|
require.Equal(t, runtime.NumCPU(), s.SyncWorkers())
|
||||||
|
require.Equal(t, runtime.NumCPU(), s.SyncBuffer())
|
||||||
|
}
|
||||||
|
|||||||
@ -19,6 +19,7 @@ package vault
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
"github.com/ProtonMail/gluon/imap"
|
"github.com/ProtonMail/gluon/imap"
|
||||||
"github.com/ProtonMail/proton-bridge/v2/internal/updater"
|
"github.com/ProtonMail/proton-bridge/v2/internal/updater"
|
||||||
@ -60,6 +61,9 @@ type Settings struct {
|
|||||||
LastVersion string
|
LastVersion string
|
||||||
FirstStart bool
|
FirstStart bool
|
||||||
FirstStartGUI bool
|
FirstStartGUI bool
|
||||||
|
|
||||||
|
SyncWorkers int
|
||||||
|
SyncBuffer int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDefaultSettings(gluonDir string) Settings {
|
func newDefaultSettings(gluonDir string) Settings {
|
||||||
@ -83,6 +87,9 @@ func newDefaultSettings(gluonDir string) Settings {
|
|||||||
LastVersion: "0.0.0",
|
LastVersion: "0.0.0",
|
||||||
FirstStart: true,
|
FirstStart: true,
|
||||||
FirstStartGUI: true,
|
FirstStartGUI: true,
|
||||||
|
|
||||||
|
SyncWorkers: runtime.NumCPU(),
|
||||||
|
SyncBuffer: runtime.NumCPU(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user