GODT-1367: use waitgroup instead of channel in pool/pchan

This commit is contained in:
James Houlahan
2021-10-05 15:03:24 +02:00
committed by Jakub
parent 8ea610c625
commit bc21bb1d8d
2 changed files with 17 additions and 17 deletions

View File

@ -33,11 +33,11 @@ type Item struct {
ch *PChan ch *PChan
val interface{} val interface{}
prio int prio int
done chan struct{} done sync.WaitGroup
} }
func (item *Item) Wait() { func (item *Item) Wait() {
<-item.done item.done.Wait()
} }
func (item *Item) GetPriority() int { func (item *Item) GetPriority() int {
@ -90,15 +90,14 @@ func (ch *PChan) push(val interface{}, prio int) *Item {
ch.lock.Lock() ch.lock.Lock()
defer ch.lock.Unlock() defer ch.lock.Unlock()
done := make(chan struct{})
item := &Item{ item := &Item{
ch: ch, ch: ch,
val: val, val: val,
prio: prio, prio: prio,
done: done,
} }
item.done.Add(1)
ch.items = append(ch.items, item) ch.items = append(ch.items, item)
return item return item
@ -116,7 +115,7 @@ func (ch *PChan) pop() (interface{}, int) {
item, ch.items = ch.items[len(ch.items)-1], ch.items[:len(ch.items)-1] item, ch.items = ch.items[len(ch.items)-1], ch.items[:len(ch.items)-1]
defer close(item.done) defer item.done.Done()
return item.val, item.prio return item.val, item.prio
} }

View File

@ -77,20 +77,21 @@ type Job struct {
item *pchan.Item item *pchan.Item
ready, done chan struct{} ready, done sync.WaitGroup
once sync.Once once sync.Once
} }
func newJob(req interface{}) *Job { func newJob(req interface{}) *Job {
return &Job{ job := &Job{req: req}
req: req,
ready: make(chan struct{}), job.ready.Add(1)
done: make(chan struct{}), job.done.Add(1)
}
return job
} }
func (job *Job) GetResult() (interface{}, error) { func (job *Job) GetResult() (interface{}, error) {
<-job.ready job.ready.Wait()
return job.res, job.err return job.res, job.err
} }
@ -104,13 +105,13 @@ func (job *Job) SetPriority(prio int) {
} }
func (job *Job) postSuccess(res interface{}) { func (job *Job) postSuccess(res interface{}) {
defer close(job.ready) defer job.ready.Done()
job.res = res job.res = res
} }
func (job *Job) postFailure(err error) { func (job *Job) postFailure(err error) {
defer close(job.ready) defer job.ready.Done()
job.err = err job.err = err
} }
@ -120,9 +121,9 @@ func (job *Job) setItem(item *pchan.Item) {
} }
func (job *Job) markDone() { func (job *Job) markDone() {
job.once.Do(func() { close(job.done) }) job.once.Do(func() { job.done.Done() })
} }
func (job *Job) waitDone() { func (job *Job) waitDone() {
<-job.done job.done.Wait()
} }