From f8b21ac04a02a784fe14941e5c89940448694b8c Mon Sep 17 00:00:00 2001 From: zeripath Date: Tue, 8 Feb 2022 18:53:34 +0000 Subject: [PATCH] Simplify Boost/Pause logic (#18673) * Simplify Boost/Pause logic #18658 has added a check to see if we need to boost because there is still work to do however the check is slightly complex and not ideal. There's no point boosting if the queue is paused or can't scale. Therefore merge the two selects into one and add a check to p.paused. Signed-off-by: Andrew Thornton * And on resume add a zeroboost if necessary Signed-off-by: Andrew Thornton * simplify Signed-off-by: Andrew Thornton Co-authored-by: Lauris BH --- modules/queue/workerpool.go | 53 +++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 39ea59b7b1..100197c5e1 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -308,22 +308,18 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, p.cond.Broadcast() cancel() } - if p.hasNoWorkerScaling() { - select { - case <-p.baseCtx.Done(): - // Don't warn if the baseCtx is shutdown - default: + select { + case <-p.baseCtx.Done(): + // Don't warn or check for ongoing work if the baseCtx is shutdown + case <-p.paused: + // Don't warn or check for ongoing work if the pool is paused + default: + if p.hasNoWorkerScaling() { log.Warn( "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) - } - p.pause() - } - select { - case <-p.baseCtx.Done(): - // this worker queue is shut-down don't reboost - default: - if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 { + p.pause() + } else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 { // OK there are no workers but... there's still work to be done -> Reboost p.zeroBoost() // p.lock will be unlocked by zeroBoost @@ -385,14 +381,37 @@ func (p *WorkerPool) pause() { // Resume resumes the WorkerPool func (p *WorkerPool) Resume() { - p.lock.Lock() - defer p.lock.Unlock() + p.lock.Lock() // can't defer unlock because of the zeroBoost at the end select { case <-p.resumed: + // already resumed - there's nothing to do + p.lock.Unlock() + return default: - p.paused = make(chan struct{}) - close(p.resumed) } + + p.paused = make(chan struct{}) + close(p.resumed) + + // OK now we need to check if we need to add some workers... + if p.numberOfWorkers > 0 || p.hasNoWorkerScaling() || atomic.LoadInt64(&p.numInQueue) == 0 { + // We either have workers, can't scale or there's no work to be done -> so just resume + p.lock.Unlock() + return + } + + // OK we got some work but no workers we need to think about boosting + select { + case <-p.baseCtx.Done(): + // don't bother boosting if the baseCtx is done + p.lock.Unlock() + return + default: + } + + // OK we'd better add some boost workers! + p.zeroBoost() + // p.zeroBoost will unlock the lock } // CleanUp will drain the remaining contents of the channel