From a93a8e7be9923c26cdf1bb48e83691b231675819 Mon Sep 17 00:00:00 2001 From: Jakub Date: Tue, 19 Oct 2021 17:08:12 +0200 Subject: [PATCH] GODT-1356 GODT-1302: Cache on disk concurency and API retries - GODT-1302: Change maximum resty retries from 0 to 30 - GODT-1302: Make sure we are closing GetAttachmen io.ReadCloser on error - GODT-1356: Do not use attachmentPool - it was useless anyway - GODT-1356: Increase cache watcher limit to 10min - GODT-1356: Start cache watcher right after start (do not wait first 10 min) - GODT-1356: Limit number of buildJobs (memory allocation) in BuildAndCacheMessage - Other: Pass context from job options (message builder) to fetcher (both message and attachments) - Other: BuildJob contains same function as returned buildDone (proper map locking) --- internal/app/bridge/bridge.go | 24 +++++++---- internal/store/cache.go | 21 ++++++++++ internal/store/cache_watcher.go | 29 ++++++------- pkg/message/build.go | 72 +++++++++++++++++++++------------ pkg/pmapi/client.go | 2 +- pkg/pmapi/manager.go | 7 ++++ pkg/pmapi/manager_test.go | 7 ++++ pkg/pmapi/response.go | 4 ++ 8 files changed, 118 insertions(+), 48 deletions(-) diff --git a/internal/app/bridge/bridge.go b/internal/app/bridge/bridge.go index 71cd027f..c17d5c16 100644 --- a/internal/app/bridge/bridge.go +++ b/internal/app/bridge/bridge.go @@ -32,6 +32,7 @@ import ( "github.com/ProtonMail/proton-bridge/internal/frontend/types" "github.com/ProtonMail/proton-bridge/internal/imap" "github.com/ProtonMail/proton-bridge/internal/smtp" + "github.com/ProtonMail/proton-bridge/internal/store" "github.com/ProtonMail/proton-bridge/internal/store/cache" "github.com/ProtonMail/proton-bridge/internal/updater" "github.com/ProtonMail/proton-bridge/pkg/message" @@ -74,7 +75,7 @@ func run(b *base.Base, c *cli.Context) error { // nolint[funlen] return err } - cache, err := loadCache(b) + cache, err := loadMessageCache(b) if err != nil { return err } @@ -247,16 +248,20 @@ func checkAndHandleUpdate(u types.Updater, f frontend.Frontend, autoUpdate bool) f.NotifySilentUpdateInstalled() } -// NOTE(GODT-1158): How big should in-memory cache be? -// NOTE(GODT-1158): How to handle cache location migration if user changes custom path? -func loadCache(b *base.Base) (cache.Cache, error) { +func loadMessageCache(b *base.Base) (cache.Cache, error) { if !b.Settings.GetBool(settings.CacheEnabledKey) { - return cache.NewInMemoryCache(100 * (1 << 20)), nil + // Memory cache was estimated by empirical usage in past and it + // was set to 100MB. + // NOTE: This value must not be less than maximal size of one + // email (~30MB). + return cache.NewInMemoryCache(100 << 20), nil } var compressor cache.Compressor - // NOTE(GODT-1158): If user changes compression setting we have to nuke the cache. + // NOTE(GODT-1158): Changing compression is not an option currently + // available for user but, if user changes compression setting we have + // to nuke the cache. if b.Settings.GetBool(settings.CacheCompressionKey) { compressor = &cache.GZipCompressor{} } else { @@ -269,10 +274,15 @@ func loadCache(b *base.Base) (cache.Cache, error) { path = customPath } else { path = b.Cache.GetDefaultMessageCacheDir() - // Store path so it will allways persist if default location will be changed in new version. + // Store path so it will allways persist if default location + // will be changed in new version. b.Settings.Set(settings.CacheLocationKey, path) } + // To prevent memory peaks we set maximal write concurency for store + // build jobs. + store.SetBuildAndCacheJobLimit(b.Settings.GetInt(settings.CacheConcurrencyWrite)) + return cache.NewOnDiskCache(path, compressor, cache.Options{ MinFreeAbs: uint64(b.Settings.GetInt(settings.CacheMinFreeAbsKey)), MinFreeRat: b.Settings.GetFloat64(settings.CacheMinFreeRatKey), diff --git a/internal/store/cache.go b/internal/store/cache.go index 9c74552b..96fe30ac 100644 --- a/internal/store/cache.go +++ b/internal/store/cache.go @@ -89,6 +89,24 @@ func (store *Store) clearCachePassphrase() error { }) } +// buildAndCacheJobs is used to limit the number of parallel background build +// jobs by using a buffered channel. When channel is blocking the go routines +// is running but the download didn't started yet and hence no space needs to +// be allocated. Once other instances are finished the job can continue. The +// bottleneck is `store.cache.Set` which can be take some time to write all +// downloaded bytes. Therefore, it is not effective to start fetching and +// building the message for more than maximum of possible parallel cache +// writers. +// +// Default buildAndCacheJobs vaule is 16, it can be changed by SetBuildAndCacheJobLimit. +var ( + buildAndCacheJobs = make(chan struct{}, 16) //nolint[gochecknoglobals] +) + +func SetBuildAndCacheJobLimit(maxJobs int) { + buildAndCacheJobs = make(chan struct{}, maxJobs) +} + func (store *Store) getCachedMessage(messageID string) ([]byte, error) { if store.cache.Has(store.user.ID(), messageID) { return store.cache.Get(store.user.ID(), messageID) @@ -118,6 +136,9 @@ func (store *Store) IsCached(messageID string) bool { // BuildAndCacheMessage builds the given message (with background priority) and puts it in the cache. // It builds with background priority. func (store *Store) BuildAndCacheMessage(messageID string) error { + buildAndCacheJobs <- struct{}{} + defer func() { <-buildAndCacheJobs }() + job, done := store.newBuildJob(messageID, message.BackgroundPriority) defer done() diff --git a/internal/store/cache_watcher.go b/internal/store/cache_watcher.go index c8f3811a..cc8701f3 100644 --- a/internal/store/cache_watcher.go +++ b/internal/store/cache_watcher.go @@ -23,26 +23,27 @@ func (store *Store) StartWatcher() { store.done = make(chan struct{}) go func() { - ticker := time.NewTicker(3 * time.Minute) + ticker := time.NewTicker(10 * time.Minute) defer ticker.Stop() for { + // NOTE(GODT-1158): Race condition here? What if DB was already closed? + messageIDs, err := store.getAllMessageIDs() + if err != nil { + return + } + + for _, messageID := range messageIDs { + if !store.IsCached(messageID) { + store.cacher.newJob(messageID) + } + } + select { - case <-ticker.C: - // NOTE(GODT-1158): Race condition here? What if DB was already closed? - messageIDs, err := store.getAllMessageIDs() - if err != nil { - return - } - - for _, messageID := range messageIDs { - if !store.IsCached(messageID) { - store.cacher.newJob(messageID) - } - } - case <-store.done: return + case <-ticker.C: + continue } } }() diff --git a/pkg/message/build.go b/pkg/message/build.go index 8d20fd7c..cbb8148e 100644 --- a/pkg/message/build.go +++ b/pkg/message/build.go @@ -58,10 +58,10 @@ type Fetcher interface { // The returned builder is ready to handle jobs -- see (*Builder).NewJob for more information. // // Call (*Builder).Done to shut down the builder and stop all workers. -func NewBuilder(fetchWorkers, attachWorkers int) *Builder { - attacherPool := pool.New(attachWorkers, newAttacherWorkFunc()) +func NewBuilder(fetchWorkers, attachmentWorkers int) *Builder { + attachmentPool := pool.New(attachmentWorkers, newAttacherWorkFunc()) - fetcherPool := pool.New(fetchWorkers, newFetcherWorkFunc(attacherPool)) + fetcherPool := pool.New(fetchWorkers, newFetcherWorkFunc(attachmentPool)) return &Builder{ pool: fetcherPool, @@ -87,6 +87,7 @@ func (builder *Builder) NewJobWithOptions(ctx context.Context, fetcher Fetcher, job, done := builder.pool.NewJob( &fetchReq{ + ctx: ctx, fetcher: fetcher, messageID: messageID, options: opts, @@ -94,14 +95,7 @@ func (builder *Builder) NewJobWithOptions(ctx context.Context, fetcher Fetcher, prio, ) - buildJob := &Job{ - Job: job, - done: done, - } - - builder.jobs[messageID] = buildJob - - return buildJob, func() { + buildDone := func() { builder.lock.Lock() defer builder.lock.Unlock() @@ -111,6 +105,15 @@ func (builder *Builder) NewJobWithOptions(ctx context.Context, fetcher Fetcher, // And mark it as done. done() } + + buildJob := &Job{ + Job: job, + done: buildDone, + } + + builder.jobs[messageID] = buildJob + + return buildJob, buildDone } func (builder *Builder) Done() { @@ -118,12 +121,14 @@ func (builder *Builder) Done() { } type fetchReq struct { + ctx context.Context fetcher Fetcher messageID string options JobOptions } type attachReq struct { + ctx context.Context fetcher Fetcher message *pmapi.Message } @@ -143,6 +148,10 @@ func (job *Job) GetResult() ([]byte, error) { return res.([]byte), nil } +// NOTE: This is not used because it is actually not doing what was expected: It +// downloads all the attachments which belongs to one message sequentially +// within one goroutine. We should have one job per one attachment. This doesn't look +// like a bottle neck right now. func newAttacherWorkFunc() pool.WorkFunc { return func(payload interface{}, prio int) (interface{}, error) { req, ok := payload.(*attachReq) @@ -153,7 +162,7 @@ func newAttacherWorkFunc() pool.WorkFunc { res := make(map[string][]byte) for _, att := range req.message.Attachments { - rc, err := req.fetcher.GetAttachment(context.Background(), att.ID) + rc, err := req.fetcher.GetAttachment(req.ctx, att.ID) if err != nil { return nil, err } @@ -174,32 +183,43 @@ func newAttacherWorkFunc() pool.WorkFunc { } } -func newFetcherWorkFunc(attacherPool *pool.Pool) pool.WorkFunc { +func newFetcherWorkFunc(attachmentPool *pool.Pool) pool.WorkFunc { return func(payload interface{}, prio int) (interface{}, error) { req, ok := payload.(*fetchReq) if !ok { panic("bad payload type") } - msg, err := req.fetcher.GetMessage(context.Background(), req.messageID) + msg, err := req.fetcher.GetMessage(req.ctx, req.messageID) if err != nil { return nil, err } - attJob, attDone := attacherPool.NewJob(&attachReq{ - fetcher: req.fetcher, - message: msg, - }, prio) - defer attDone() + attData := make(map[string][]byte) - val, err := attJob.GetResult() - if err != nil { - return nil, err - } + for _, att := range msg.Attachments { + // NOTE: Potential place for optimization: + // Use attachmentPool to download each attachment in + // separate parallel job. It is not straightforward + // because we need to make sure we call attachment-job-done + // function in case of any error or after we collect all + // attachment bytes asynchronously. + rc, err := req.fetcher.GetAttachment(req.ctx, att.ID) + if err != nil { + return nil, err + } - attData, ok := val.(map[string][]byte) - if !ok { - panic("bad response type") + b, err := ioutil.ReadAll(rc) + if err != nil { + _ = rc.Close() + return nil, err + } + + if err := rc.Close(); err != nil { + return nil, err + } + + attData[att.ID] = b } kr, err := req.fetcher.KeyRingForAddressID(msg.AddressID) diff --git a/pkg/pmapi/client.go b/pkg/pmapi/client.go index 4826f130..ef0747ee 100644 --- a/pkg/pmapi/client.go +++ b/pkg/pmapi/client.go @@ -84,7 +84,7 @@ func (c *client) r(ctx context.Context) (*resty.Request, error) { return r, nil } -// do executes fn and may repeate it in case "401 Unauthorized" error is returned. +// do executes fn and may repeat execution in case of retry after "401 Unauthorized" error. // Note: fn may be called more than once. func (c *client) do(ctx context.Context, fn func(*resty.Request) (*resty.Response, error)) (*resty.Response, error) { r, err := c.r(ctx) diff --git a/pkg/pmapi/manager.go b/pkg/pmapi/manager.go index da33f2ed..512b81c2 100644 --- a/pkg/pmapi/manager.go +++ b/pkg/pmapi/manager.go @@ -66,6 +66,13 @@ func newManager(cfg Config) *manager { m.rc.OnError(m.handleRequestFailure) // Configure retry mechanism. + // + // SetRetryCount(30): The most probable value of Retry-After is 1s (max + // 10s). Retrying up to 30 times will most probably cause a delay of + // 30s. The worst case scenario is 5min which is OK for background + // requests. We shuold use context to control a foreground requests + // which should be finish or fail sooner. + m.rc.SetRetryCount(30) m.rc.SetRetryMaxWaitTime(time.Minute) m.rc.SetRetryAfter(catchRetryAfter) m.rc.AddRetryCondition(shouldRetry) diff --git a/pkg/pmapi/manager_test.go b/pkg/pmapi/manager_test.go index bf592687..5f6b5115 100644 --- a/pkg/pmapi/manager_test.go +++ b/pkg/pmapi/manager_test.go @@ -34,6 +34,11 @@ const testForceUpgradeBody = `{ "Error":"Upgrade!" }` +const testTooManyAPIRequests = `{ + "Code":85131, + "Error":"Too many recent API requests" +}` + func TestHandleTooManyRequests(t *testing.T) { var numCalls int @@ -42,6 +47,8 @@ func TestHandleTooManyRequests(t *testing.T) { if numCalls < 5 { w.WriteHeader(http.StatusTooManyRequests) + w.Header().Set("content-type", "application/json;charset=utf-8") + fmt.Fprint(w, testTooManyAPIRequests) } else { w.WriteHeader(http.StatusOK) } diff --git a/pkg/pmapi/response.go b/pkg/pmapi/response.go index ff1bb8cc..f79bb90a 100644 --- a/pkg/pmapi/response.go +++ b/pkg/pmapi/response.go @@ -129,6 +129,10 @@ func isTooManyRequest(res *resty.Response) bool { } func isNoResponse(res *resty.Response, err error) bool { + // Do not retry TLS failures + if errors.Is(err, ErrTLSMismatch) { + return false + } return res.RawResponse == nil && err != nil }