From bed2971bf0ca7a258ac0822332ab8c32ec3f6d1a Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 30 Jan 2019 15:11:01 +0000 Subject: [PATCH] lib/pool: a buffer recycling library which can be optionally be used with mmap --- lib/pool/pool.go | 118 ++++++++++++++++++++++++++++++++++++++++++ lib/pool/pool_test.go | 92 ++++++++++++++++++++++++++++++++ 2 files changed, 210 insertions(+) create mode 100644 lib/pool/pool.go create mode 100644 lib/pool/pool_test.go diff --git a/lib/pool/pool.go b/lib/pool/pool.go new file mode 100644 index 000000000..acd44a12c --- /dev/null +++ b/lib/pool/pool.go @@ -0,0 +1,118 @@ +// Package pool implements a memory pool similar in concept to +// sync.Pool but with more determinism. +package pool + +import ( + "fmt" + "log" + "sync/atomic" + "time" + + "github.com/ncw/rclone/lib/mmap" +) + +// Pool of internal buffers +type Pool struct { + cache chan []byte + bufferSize int + timer *time.Timer + inUse int32 + flushTime time.Duration + alloc func(int) ([]byte, error) + free func([]byte) error +} + +// New makes a buffer pool +// +// flushTime is the interval the buffer pools is flushed +// bufferSize is the size of the allocations +// poolSize is the maximum number of free buffers in the pool +// useMmap should be set to use mmap allocations +func New(flushTime time.Duration, bufferSize, poolSize int, useMmap bool) *Pool { + bp := &Pool{ + cache: make(chan []byte, poolSize), + flushTime: flushTime, + bufferSize: bufferSize, + } + if useMmap { + bp.alloc = mmap.Alloc + bp.free = mmap.Free + } else { + bp.alloc = func(size int) ([]byte, error) { + return make([]byte, size), nil + } + bp.free = func([]byte) error { + return nil + } + } + bp.timer = time.AfterFunc(flushTime, bp.Flush) + return bp +} + +// Flush the entire buffer pool +func (bp *Pool) Flush() { + for { + select { + case b := <-bp.cache: + bp.freeBuffer(b) + default: + return + } + } +} + +// InUse returns the approximate number of buffers in use which +// haven't been returned to the pool. +func (bp *Pool) InUse() int { + return int(atomic.LoadInt32(&bp.inUse)) +} + +// starts or resets the buffer flusher timer +func (bp *Pool) kickFlusher() { + bp.timer.Reset(bp.flushTime) +} + +// Get a buffer from the pool or allocate one +func (bp *Pool) Get() []byte { + select { + case b := <-bp.cache: + return b + default: + } + mem, err := bp.alloc(bp.bufferSize) + if err != nil { + log.Printf("Failed to get memory for buffer, waiting for a freed one: %v", err) + return <-bp.cache + } + atomic.AddInt32(&bp.inUse, 1) + return mem +} + +// freeBuffer returns mem to the os if required +func (bp *Pool) freeBuffer(mem []byte) { + err := bp.free(mem) + if err != nil { + log.Printf("Failed to free memory: %v", err) + } else { + atomic.AddInt32(&bp.inUse, -1) + } +} + +// Put returns the buffer to the buffer cache or frees it +// +// Note that if you try to return a buffer of the wrong size to Put it +// will panic. +func (bp *Pool) Put(mem []byte) { + mem = mem[0:cap(mem)] + if len(mem) != bp.bufferSize { + panic(fmt.Sprintf("Returning buffer sized %d but expecting %d", len(mem), bp.bufferSize)) + } + select { + case bp.cache <- mem: + bp.kickFlusher() + return + default: + } + bp.freeBuffer(mem) + mem = nil +} diff --git a/lib/pool/pool_test.go b/lib/pool/pool_test.go new file mode 100644 index 000000000..7f9ceb4f9 --- /dev/null +++ b/lib/pool/pool_test.go @@ -0,0 +1,92 @@ +package pool + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func testGetPut(t *testing.T, useMmap bool) { + bp := New(60*time.Second, 4096, 2, useMmap) + + assert.Equal(t, 0, bp.InUse()) + + b1 := bp.Get() + assert.Equal(t, 1, bp.InUse()) + + b2 := bp.Get() + assert.Equal(t, 2, bp.InUse()) + + b3 := bp.Get() + assert.Equal(t, 3, bp.InUse()) + + bp.Put(b1) + assert.Equal(t, 3, bp.InUse()) + + bp.Put(b2) + assert.Equal(t, 3, bp.InUse()) + + bp.Put(b3) + assert.Equal(t, 2, bp.InUse()) + + b1a := bp.Get() + assert.Equal(t, b1, b1a) + assert.Equal(t, 2, bp.InUse()) + + b2a := bp.Get() + assert.Equal(t, b1, b2a) + assert.Equal(t, 2, bp.InUse()) + + bp.Put(b1a) + bp.Put(b2a) + assert.Equal(t, 2, bp.InUse()) + + bp.Flush() + assert.Equal(t, 0, bp.InUse()) +} + +func testFlusher(t *testing.T, useMmap bool) { + bp := New(50*time.Millisecond, 4096, 2, useMmap) + + b1 := bp.Get() + b2 := bp.Get() + b3 := bp.Get() + bp.Put(b1) + bp.Put(b2) + bp.Put(b3) + assert.Equal(t, 2, bp.InUse()) + + checkFlushHasHappened := func() { + var n int + for i := 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + n = bp.InUse() + if n == 0 { + break + } + } + assert.Equal(t, 0, n) + } + + checkFlushHasHappened() + + b1 = bp.Get() + bp.Put(b1) + assert.Equal(t, 1, bp.InUse()) + + checkFlushHasHappened() +} + +func TestPool(t *testing.T) { + for _, useMmap := range []bool{false, true} { + name := "make" + if useMmap { + name = "mmap" + } + t.Run(name, func(t *testing.T) { + t.Run("GetPut", func(t *testing.T) { testGetPut(t, useMmap) }) + t.Run("Flusher", func(t *testing.T) { testFlusher(t, useMmap) }) + }) + } +}