mirror of
https://github.com/ProtonMail/proton-bridge.git
synced 2025-12-10 20:56:51 +00:00
137 lines
3.7 KiB
Go
137 lines
3.7 KiB
Go
// Copyright (c) 2020 Proton Technologies AG
|
|
//
|
|
// This file is part of ProtonMail Bridge.
|
|
//
|
|
// ProtonMail Bridge is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// ProtonMail Bridge is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with ProtonMail Bridge. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package parallel
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// parallelJob is to be used for passing items between input, worker and
|
|
// collector. `idx` is there to know the original order.
|
|
type parallelJob struct {
|
|
idx int
|
|
value interface{}
|
|
}
|
|
|
|
// RunParallel starts `workers` number of workers and feeds them with `input` data.
|
|
// Each worker calls `process`. Processed data is collected in the same order as
|
|
// the input and is passed in order to the `collect` callback. If an error
|
|
// occurs, the execution is stopped and the error returned.
|
|
// runParallel blocks until everything is done.
|
|
func RunParallel( //nolint[funlen]
|
|
workers int,
|
|
input []interface{},
|
|
process func(interface{}) (interface{}, error),
|
|
collect func(int, interface{}) error,
|
|
) (resultError error) {
|
|
wgProcess := &sync.WaitGroup{}
|
|
wgCollect := &sync.WaitGroup{}
|
|
|
|
// Optimise by not executing the code at all if there is no input
|
|
// or run less workers than requested if there are few inputs.
|
|
inputLen := len(input)
|
|
if inputLen == 0 {
|
|
return nil
|
|
}
|
|
if inputLen < workers {
|
|
workers = inputLen
|
|
}
|
|
|
|
inputChan := make(chan *parallelJob)
|
|
outputChan := make(chan *parallelJob)
|
|
|
|
orderedCollectLock := &sync.Mutex{}
|
|
orderedCollect := make(map[int]interface{})
|
|
|
|
// Feed input channel used by workers with input data with index for ordering.
|
|
go func() {
|
|
defer close(inputChan)
|
|
for idx, item := range input {
|
|
if resultError != nil {
|
|
break
|
|
}
|
|
inputChan <- ¶llelJob{idx, item}
|
|
}
|
|
}()
|
|
|
|
// Start workers and process all the inputs.
|
|
wgProcess.Add(workers)
|
|
for i := 0; i < workers; i++ {
|
|
go func() {
|
|
defer wgProcess.Done()
|
|
for item := range inputChan {
|
|
if output, err := process(item.value); err != nil {
|
|
resultError = err
|
|
break
|
|
} else {
|
|
outputChan <- ¶llelJob{item.idx, output}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Collect data into map with the original position in the array.
|
|
wgCollect.Add(1)
|
|
go func() {
|
|
defer wgCollect.Done()
|
|
for output := range outputChan {
|
|
orderedCollectLock.Lock()
|
|
orderedCollect[output.idx] = output.value
|
|
orderedCollectLock.Unlock()
|
|
}
|
|
}()
|
|
|
|
// Collect data in the same order as in the input array.
|
|
wgCollect.Add(1)
|
|
go func() {
|
|
defer wgCollect.Done()
|
|
idx := 0
|
|
for {
|
|
if idx >= inputLen || resultError != nil {
|
|
break
|
|
}
|
|
orderedCollectLock.Lock()
|
|
value, ok := orderedCollect[idx]
|
|
if ok {
|
|
if err := collect(idx, value); err != nil {
|
|
resultError = err
|
|
}
|
|
delete(orderedCollect, idx)
|
|
idx++
|
|
}
|
|
orderedCollectLock.Unlock()
|
|
if !ok {
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// When input channel is closed, all workers will finish. We need to wait
|
|
// for all of them and close the output channel only once.
|
|
wgProcess.Wait()
|
|
close(outputChan)
|
|
|
|
// When workers are done, the last job is to finish collecting data. First
|
|
// collector is finished when output channel is closed and the second one
|
|
// when all items are passed to `collect` in the order or after an error.
|
|
wgCollect.Wait()
|
|
|
|
return resultError
|
|
}
|