mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-11 13:16:53 +00:00
GODT-1356 GODT-1302: Cache on disk concurency and API retries
- GODT-1302: Change maximum resty retries from 0 to 30 - GODT-1302: Make sure we are closing GetAttachmen io.ReadCloser on error - GODT-1356: Do not use attachmentPool - it was useless anyway - GODT-1356: Increase cache watcher limit to 10min - GODT-1356: Start cache watcher right after start (do not wait first 10 min) - GODT-1356: Limit number of buildJobs (memory allocation) in BuildAndCacheMessage - Other: Pass context from job options (message builder) to fetcher (both message and attachments) - Other: BuildJob contains same function as returned buildDone (proper map locking)
This commit is contained in:
@ -58,10 +58,10 @@ type Fetcher interface {
|
||||
// The returned builder is ready to handle jobs -- see (*Builder).NewJob for more information.
|
||||
//
|
||||
// Call (*Builder).Done to shut down the builder and stop all workers.
|
||||
func NewBuilder(fetchWorkers, attachWorkers int) *Builder {
|
||||
attacherPool := pool.New(attachWorkers, newAttacherWorkFunc())
|
||||
func NewBuilder(fetchWorkers, attachmentWorkers int) *Builder {
|
||||
attachmentPool := pool.New(attachmentWorkers, newAttacherWorkFunc())
|
||||
|
||||
fetcherPool := pool.New(fetchWorkers, newFetcherWorkFunc(attacherPool))
|
||||
fetcherPool := pool.New(fetchWorkers, newFetcherWorkFunc(attachmentPool))
|
||||
|
||||
return &Builder{
|
||||
pool: fetcherPool,
|
||||
@ -87,6 +87,7 @@ func (builder *Builder) NewJobWithOptions(ctx context.Context, fetcher Fetcher,
|
||||
|
||||
job, done := builder.pool.NewJob(
|
||||
&fetchReq{
|
||||
ctx: ctx,
|
||||
fetcher: fetcher,
|
||||
messageID: messageID,
|
||||
options: opts,
|
||||
@ -94,14 +95,7 @@ func (builder *Builder) NewJobWithOptions(ctx context.Context, fetcher Fetcher,
|
||||
prio,
|
||||
)
|
||||
|
||||
buildJob := &Job{
|
||||
Job: job,
|
||||
done: done,
|
||||
}
|
||||
|
||||
builder.jobs[messageID] = buildJob
|
||||
|
||||
return buildJob, func() {
|
||||
buildDone := func() {
|
||||
builder.lock.Lock()
|
||||
defer builder.lock.Unlock()
|
||||
|
||||
@ -111,6 +105,15 @@ func (builder *Builder) NewJobWithOptions(ctx context.Context, fetcher Fetcher,
|
||||
// And mark it as done.
|
||||
done()
|
||||
}
|
||||
|
||||
buildJob := &Job{
|
||||
Job: job,
|
||||
done: buildDone,
|
||||
}
|
||||
|
||||
builder.jobs[messageID] = buildJob
|
||||
|
||||
return buildJob, buildDone
|
||||
}
|
||||
|
||||
func (builder *Builder) Done() {
|
||||
@ -118,12 +121,14 @@ func (builder *Builder) Done() {
|
||||
}
|
||||
|
||||
type fetchReq struct {
|
||||
ctx context.Context
|
||||
fetcher Fetcher
|
||||
messageID string
|
||||
options JobOptions
|
||||
}
|
||||
|
||||
type attachReq struct {
|
||||
ctx context.Context
|
||||
fetcher Fetcher
|
||||
message *pmapi.Message
|
||||
}
|
||||
@ -143,6 +148,10 @@ func (job *Job) GetResult() ([]byte, error) {
|
||||
return res.([]byte), nil
|
||||
}
|
||||
|
||||
// NOTE: This is not used because it is actually not doing what was expected: It
|
||||
// downloads all the attachments which belongs to one message sequentially
|
||||
// within one goroutine. We should have one job per one attachment. This doesn't look
|
||||
// like a bottle neck right now.
|
||||
func newAttacherWorkFunc() pool.WorkFunc {
|
||||
return func(payload interface{}, prio int) (interface{}, error) {
|
||||
req, ok := payload.(*attachReq)
|
||||
@ -153,7 +162,7 @@ func newAttacherWorkFunc() pool.WorkFunc {
|
||||
res := make(map[string][]byte)
|
||||
|
||||
for _, att := range req.message.Attachments {
|
||||
rc, err := req.fetcher.GetAttachment(context.Background(), att.ID)
|
||||
rc, err := req.fetcher.GetAttachment(req.ctx, att.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -174,32 +183,43 @@ func newAttacherWorkFunc() pool.WorkFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func newFetcherWorkFunc(attacherPool *pool.Pool) pool.WorkFunc {
|
||||
func newFetcherWorkFunc(attachmentPool *pool.Pool) pool.WorkFunc {
|
||||
return func(payload interface{}, prio int) (interface{}, error) {
|
||||
req, ok := payload.(*fetchReq)
|
||||
if !ok {
|
||||
panic("bad payload type")
|
||||
}
|
||||
|
||||
msg, err := req.fetcher.GetMessage(context.Background(), req.messageID)
|
||||
msg, err := req.fetcher.GetMessage(req.ctx, req.messageID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
attJob, attDone := attacherPool.NewJob(&attachReq{
|
||||
fetcher: req.fetcher,
|
||||
message: msg,
|
||||
}, prio)
|
||||
defer attDone()
|
||||
attData := make(map[string][]byte)
|
||||
|
||||
val, err := attJob.GetResult()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, att := range msg.Attachments {
|
||||
// NOTE: Potential place for optimization:
|
||||
// Use attachmentPool to download each attachment in
|
||||
// separate parallel job. It is not straightforward
|
||||
// because we need to make sure we call attachment-job-done
|
||||
// function in case of any error or after we collect all
|
||||
// attachment bytes asynchronously.
|
||||
rc, err := req.fetcher.GetAttachment(req.ctx, att.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
attData, ok := val.(map[string][]byte)
|
||||
if !ok {
|
||||
panic("bad response type")
|
||||
b, err := ioutil.ReadAll(rc)
|
||||
if err != nil {
|
||||
_ = rc.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := rc.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
attData[att.ID] = b
|
||||
}
|
||||
|
||||
kr, err := req.fetcher.KeyRingForAddressID(msg.AddressID)
|
||||
|
||||
Reference in New Issue
Block a user