cache: fix worker scale down

Ensure that calling scaleWorkers will create/destroy the right amount of
workers.
This commit is contained in:
Fabian Möller 2018-08-30 12:15:30 +02:00
parent cdbe3691b7
commit a0b3fd3a33
1 changed files with 24 additions and 56 deletions

View File

@ -49,12 +49,13 @@ type Handle struct {
offset int64
seenOffsets map[int64]bool
mu sync.Mutex
workersWg sync.WaitGroup
confirmReading chan bool
UseMemory bool
workers []*worker
closed bool
reading bool
workers int
maxWorkerID int
UseMemory bool
closed bool
reading bool
}
// NewObjectHandle returns a new Handle for an existing Object
@ -95,7 +96,7 @@ func (r *Handle) String() string {
// startReadWorkers will start the worker pool
func (r *Handle) startReadWorkers() {
if r.hasAtLeastOneWorker() {
if r.workers > 0 {
return
}
totalWorkers := r.cacheFs().opt.TotalWorkers
@ -117,26 +118,27 @@ func (r *Handle) startReadWorkers() {
// scaleOutWorkers will increase the worker pool count by the provided amount
func (r *Handle) scaleWorkers(desired int) {
current := len(r.workers)
current := r.workers
if current == desired {
return
}
if current > desired {
// scale in gracefully
for i := 0; i < current-desired; i++ {
for r.workers > desired {
r.preloadQueue <- -1
r.workers--
}
} else {
// scale out
for i := 0; i < desired-current; i++ {
for r.workers < desired {
w := &worker{
r: r,
ch: r.preloadQueue,
id: current + i,
id: r.maxWorkerID,
}
r.workersWg.Add(1)
r.workers++
r.maxWorkerID++
go w.run()
r.workers = append(r.workers, w)
}
}
// ignore first scale out from 0
@ -148,7 +150,7 @@ func (r *Handle) scaleWorkers(desired int) {
func (r *Handle) confirmExternalReading() {
// if we have a max value of workers
// then we skip this step
if len(r.workers) > 1 ||
if r.workers > 1 ||
!r.cacheFs().plexConnector.isConfigured() {
return
}
@ -178,7 +180,7 @@ func (r *Handle) queueOffset(offset int64) {
}
}
for i := 0; i < len(r.workers); i++ {
for i := 0; i < r.workers; i++ {
o := r.preloadOffset + int64(r.cacheFs().opt.ChunkSize)*int64(i)
if o < 0 || o >= r.cachedObject.Size() {
continue
@ -193,16 +195,6 @@ func (r *Handle) queueOffset(offset int64) {
}
}
func (r *Handle) hasAtLeastOneWorker() bool {
oneWorker := false
for i := 0; i < len(r.workers); i++ {
if r.workers[i].isRunning() {
oneWorker = true
}
}
return oneWorker
}
// getChunk is called by the FS to retrieve a specific chunk of known start and size from where it can find it
// it can be from transient or persistent cache
// it will also build the chunk from the cache's specific chunk boundaries and build the final desired chunk in a buffer
@ -243,7 +235,7 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) {
// not found in ram or
// the worker didn't managed to download the chunk in time so we abort and close the stream
if err != nil || len(data) == 0 || !found {
if !r.hasAtLeastOneWorker() {
if r.workers == 0 {
fs.Errorf(r, "out of workers")
return nil, io.ErrUnexpectedEOF
}
@ -304,14 +296,7 @@ func (r *Handle) Close() error {
close(r.preloadQueue)
r.closed = true
// wait for workers to complete their jobs before returning
waitCount := 3
for i := 0; i < len(r.workers); i++ {
waitIdx := 0
for r.workers[i].isRunning() && waitIdx < waitCount {
time.Sleep(time.Second)
waitIdx++
}
}
r.workersWg.Wait()
r.memory.db.Flush()
fs.Debugf(r, "cache reader closed %v", r.offset)
@ -348,12 +333,9 @@ func (r *Handle) Seek(offset int64, whence int) (int64, error) {
}
type worker struct {
r *Handle
ch <-chan int64
rc io.ReadCloser
id int
running bool
mu sync.Mutex
r *Handle
rc io.ReadCloser
id int
}
// String is a representation of this worker
@ -398,33 +380,19 @@ func (w *worker) reader(offset, end int64, closeOpen bool) (io.ReadCloser, error
})
}
func (w *worker) isRunning() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.running
}
func (w *worker) setRunning(f bool) {
w.mu.Lock()
defer w.mu.Unlock()
w.running = f
}
// run is the main loop for the worker which receives offsets to preload
func (w *worker) run() {
var err error
var data []byte
defer w.setRunning(false)
defer func() {
if w.rc != nil {
_ = w.rc.Close()
w.setRunning(false)
}
w.r.workersWg.Done()
}()
for {
chunkStart, open := <-w.ch
w.setRunning(true)
chunkStart, open := <-w.r.preloadQueue
if chunkStart < 0 || !open {
break
}