From 4b6d0d035ea71f1afa727d6228354301ea426065 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Tue, 22 Nov 2022 15:21:39 +0100 Subject: [PATCH] GODT-2085: Ensure minimum sync worker count Make sure that we are at least using 16 workers for sync, otherwise multiply the current sync worker count by 2. Finally, this patch also logs the duration of the time it takes to transfer all the messages from the server. --- internal/user/sync.go | 13 +++++++++++-- internal/vault/settings_test.go | 6 +++--- internal/vault/types_settings.go | 18 ++++++++++++++++-- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/internal/user/sync.go b/internal/user/sync.go index 7890f1d2..002f3ce6 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -20,6 +20,7 @@ package user import ( "context" "fmt" + "runtime" "strings" "time" @@ -32,6 +33,7 @@ import ( "github.com/bradenaw/juniper/parallel" "github.com/bradenaw/juniper/xslices" "github.com/google/uuid" + "github.com/sirupsen/logrus" "gitlab.protontech.ch/go/liteapi" "golang.org/x/exp/maps" ) @@ -197,6 +199,13 @@ func syncMessages( //nolint:funlen return fmt.Errorf("failed to get message IDs to sync: %w", err) } + syncStartTime := time.Now() + defer func() { + syncFinishTime := time.Now() + logrus.Infof("Message sync completed in %v", syncFinishTime.Sub(syncStartTime)) + }() + logrus.Infof("Starting message sync with syncWorkers=%v (numCpu=%v) for %v messages", syncWorkers, runtime.NumCPU(), len(messageIDs)) + // Create the flushers, one per update channel. flushers := make(map[string]*flusher, len(updateCh)) @@ -223,7 +232,7 @@ func syncMessages( //nolint:funlen // Allow up to 4 batched wait requests. flushUpdateCh := make(chan flushUpdate, 4) - errorCh := make(chan error, syncWorkers*2) + errorCh := make(chan error, syncWorkers) // Goroutine in charge of downloading and building messages in maxBatchSize batches. go func() { @@ -236,7 +245,7 @@ func syncMessages( //nolint:funlen return } - result, err := parallel.MapContext(ctx, int(float32(syncWorkers)*1.5), batch, func(ctx context.Context, id string) (*buildRes, error) { + result, err := parallel.MapContext(ctx, syncWorkers, batch, func(ctx context.Context, id string) (*buildRes, error) { msg, err := client.GetFullMessage(ctx, id) if err != nil { return nil, err diff --git a/internal/vault/settings_test.go b/internal/vault/settings_test.go index 972c5e9a..163a7e47 100644 --- a/internal/vault/settings_test.go +++ b/internal/vault/settings_test.go @@ -18,7 +18,6 @@ package vault_test import ( - "runtime" "testing" "github.com/Masterminds/semver/v3" @@ -222,6 +221,7 @@ func TestVault_Settings_SyncWorkers(t *testing.T) { // create a new test vault. s := newVault(t) - require.Equal(t, runtime.NumCPU(), s.SyncWorkers()) - require.Equal(t, runtime.NumCPU(), s.SyncAttPool()) + syncWorkers := vault.GetDefaultSyncWorkerCount() + require.Equal(t, syncWorkers, s.SyncWorkers()) + require.Equal(t, syncWorkers, s.SyncAttPool()) } diff --git a/internal/vault/types_settings.go b/internal/vault/types_settings.go index 69be3869..252f9b35 100644 --- a/internal/vault/types_settings.go +++ b/internal/vault/types_settings.go @@ -49,7 +49,21 @@ type Settings struct { SyncAttPool int } +func GetDefaultSyncWorkerCount() int { + const minSyncWorkers = 16 + + syncWorkers := runtime.NumCPU() * 2 + + if syncWorkers < minSyncWorkers { + syncWorkers = minSyncWorkers + } + + return syncWorkers +} + func newDefaultSettings(gluonDir string) Settings { + syncWorkers := GetDefaultSyncWorkerCount() + return Settings{ GluonDir: gluonDir, @@ -71,7 +85,7 @@ func newDefaultSettings(gluonDir string) Settings { FirstStart: true, FirstStartGUI: true, - SyncWorkers: runtime.NumCPU(), - SyncAttPool: runtime.NumCPU(), + SyncWorkers: syncWorkers, + SyncAttPool: syncWorkers, } }