diff --git a/internal/user/sync.go b/internal/user/sync.go index 7890f1d2..002f3ce6 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -20,6 +20,7 @@ package user import ( "context" "fmt" + "runtime" "strings" "time" @@ -32,6 +33,7 @@ import ( "github.com/bradenaw/juniper/parallel" "github.com/bradenaw/juniper/xslices" "github.com/google/uuid" + "github.com/sirupsen/logrus" "gitlab.protontech.ch/go/liteapi" "golang.org/x/exp/maps" ) @@ -197,6 +199,13 @@ func syncMessages( //nolint:funlen return fmt.Errorf("failed to get message IDs to sync: %w", err) } + syncStartTime := time.Now() + defer func() { + syncFinishTime := time.Now() + logrus.Infof("Message sync completed in %v", syncFinishTime.Sub(syncStartTime)) + }() + logrus.Infof("Starting message sync with syncWorkers=%v (numCpu=%v) for %v messages", syncWorkers, runtime.NumCPU(), len(messageIDs)) + // Create the flushers, one per update channel. flushers := make(map[string]*flusher, len(updateCh)) @@ -223,7 +232,7 @@ func syncMessages( //nolint:funlen // Allow up to 4 batched wait requests. flushUpdateCh := make(chan flushUpdate, 4) - errorCh := make(chan error, syncWorkers*2) + errorCh := make(chan error, syncWorkers) // Goroutine in charge of downloading and building messages in maxBatchSize batches. go func() { @@ -236,7 +245,7 @@ func syncMessages( //nolint:funlen return } - result, err := parallel.MapContext(ctx, int(float32(syncWorkers)*1.5), batch, func(ctx context.Context, id string) (*buildRes, error) { + result, err := parallel.MapContext(ctx, syncWorkers, batch, func(ctx context.Context, id string) (*buildRes, error) { msg, err := client.GetFullMessage(ctx, id) if err != nil { return nil, err diff --git a/internal/vault/settings_test.go b/internal/vault/settings_test.go index 972c5e9a..163a7e47 100644 --- a/internal/vault/settings_test.go +++ b/internal/vault/settings_test.go @@ -18,7 +18,6 @@ package vault_test import ( - "runtime" "testing" "github.com/Masterminds/semver/v3" @@ -222,6 +221,7 @@ func TestVault_Settings_SyncWorkers(t *testing.T) { // create a new test vault. s := newVault(t) - require.Equal(t, runtime.NumCPU(), s.SyncWorkers()) - require.Equal(t, runtime.NumCPU(), s.SyncAttPool()) + syncWorkers := vault.GetDefaultSyncWorkerCount() + require.Equal(t, syncWorkers, s.SyncWorkers()) + require.Equal(t, syncWorkers, s.SyncAttPool()) } diff --git a/internal/vault/types_settings.go b/internal/vault/types_settings.go index 69be3869..252f9b35 100644 --- a/internal/vault/types_settings.go +++ b/internal/vault/types_settings.go @@ -49,7 +49,21 @@ type Settings struct { SyncAttPool int } +func GetDefaultSyncWorkerCount() int { + const minSyncWorkers = 16 + + syncWorkers := runtime.NumCPU() * 2 + + if syncWorkers < minSyncWorkers { + syncWorkers = minSyncWorkers + } + + return syncWorkers +} + func newDefaultSettings(gluonDir string) Settings { + syncWorkers := GetDefaultSyncWorkerCount() + return Settings{ GluonDir: gluonDir, @@ -71,7 +85,7 @@ func newDefaultSettings(gluonDir string) Settings { FirstStart: true, FirstStartGUI: true, - SyncWorkers: runtime.NumCPU(), - SyncAttPool: runtime.NumCPU(), + SyncWorkers: syncWorkers, + SyncAttPool: syncWorkers, } }