Compare commits
9 Commits
7aa1f19f56
...
dev-docs
| Author | SHA1 | Date | |
|---|---|---|---|
| 83175e939e | |||
| 42ef3ec97e | |||
| 4f01e91634 | |||
| 08252778ba | |||
| e3ccab83ab | |||
| 9553f87292 | |||
| dbebf59b6b | |||
| 9ee2d64865 | |||
| c64e032934 |
2
README.md
Normal file
2
README.md
Normal file
@@ -0,0 +1,2 @@
|
||||
# MultiReader
|
||||
Single writer with multiple readers. Designed for small number of concurrent readers.
|
||||
@@ -2,6 +2,7 @@ package multireader
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
@@ -46,9 +47,10 @@ func TestReadWrite(t *testing.T) {
|
||||
|
||||
writeFunc := func(t *testing.T, m *MultiReader, data []byte, wg *sync.WaitGroup) {
|
||||
var (
|
||||
n int
|
||||
dl int
|
||||
r int
|
||||
n int
|
||||
dl int
|
||||
r int
|
||||
loopFlag bool
|
||||
)
|
||||
defer wg.Done()
|
||||
|
||||
@@ -60,13 +62,21 @@ func TestReadWrite(t *testing.T) {
|
||||
r, _ = m.Write(data[n : n+512+dl])
|
||||
}
|
||||
n += r
|
||||
if rand.Intn(5) == 0 {
|
||||
loopFlag = false
|
||||
for !loopFlag {
|
||||
if rand.Intn(100) == 0 {
|
||||
loopFlag = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = m.Close()
|
||||
}
|
||||
wg.Add(1)
|
||||
go writeFunc(t, m, src, wg)
|
||||
|
||||
readFunc := func(t *testing.T, r *Reader, data []byte, wg *sync.WaitGroup) {
|
||||
readFunc := func(t *testing.T, r *Reader, data []byte, wg *sync.WaitGroup, split bool) {
|
||||
var (
|
||||
offset int
|
||||
n int
|
||||
@@ -77,10 +87,30 @@ func TestReadWrite(t *testing.T) {
|
||||
offset = 0
|
||||
for {
|
||||
if offset+1024 < len(data) {
|
||||
n, err = r.Read(data[offset : offset+1024])
|
||||
if err != nil {
|
||||
t.Logf("reader got error: %v", err)
|
||||
break
|
||||
if split {
|
||||
_, err = r.WaitAvailable()
|
||||
if err != nil {
|
||||
t.Logf("reader got error on WaitAvailable: %v", err)
|
||||
break
|
||||
}
|
||||
n, err = r.ReadAhead(data[offset : offset+1024])
|
||||
if err != nil {
|
||||
t.Logf("reader got error on ReadAhead: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
n -= 5
|
||||
_, err = r.Seek(int64(n), io.SeekCurrent)
|
||||
if err != nil {
|
||||
t.Logf("reader got error on Seek: %v", err)
|
||||
break
|
||||
}
|
||||
} else {
|
||||
n, err = r.Read(data[offset : offset+1024])
|
||||
if err != nil {
|
||||
t.Logf("reader got error: %v", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
n, err = r.Read(data[offset:])
|
||||
@@ -95,11 +125,11 @@ func TestReadWrite(t *testing.T) {
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go readFunc(t, r1, dst1, wg)
|
||||
go readFunc(t, r1, dst1, wg, false)
|
||||
wg.Add(1)
|
||||
go readFunc(t, r2, dst2, wg)
|
||||
go readFunc(t, r2, dst2, wg, false)
|
||||
wg.Add(1)
|
||||
go readFunc(t, r3, dst3, wg)
|
||||
go readFunc(t, r3, dst3, wg, true)
|
||||
|
||||
wg.Wait()
|
||||
if !bytes.Equal(src, dst1) {
|
||||
@@ -112,3 +142,87 @@ func TestReadWrite(t *testing.T) {
|
||||
t.Error("src and dst3 mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReaderClose(t *testing.T) {
|
||||
var (
|
||||
m *MultiReader
|
||||
r1 *Reader
|
||||
r2 *Reader
|
||||
r3 *Reader
|
||||
src, dst, dst1, dst3 []byte
|
||||
err error
|
||||
found bool
|
||||
ctr int
|
||||
)
|
||||
|
||||
src = make([]byte, 16)
|
||||
dst = make([]byte, 32)
|
||||
dst1 = dst[:16]
|
||||
dst3 = dst[16:]
|
||||
m = NewMultiReader(0)
|
||||
r1 = m.NewReader()
|
||||
r2 = m.NewReader()
|
||||
r3 = m.NewReader()
|
||||
|
||||
_, _ = rand.Read(src)
|
||||
_, err = m.Write(src)
|
||||
if err != nil {
|
||||
t.Fatalf("error setting up data: %v", err)
|
||||
}
|
||||
|
||||
found = false
|
||||
for ctr = range m.pos {
|
||||
if m.pos[ctr].id == r2.id {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("unexpected behavior, r2 should exist in reader position list")
|
||||
}
|
||||
|
||||
_ = r2.Close()
|
||||
_, err = r2.Read(dst)
|
||||
if err == nil && err != ErrReaderClosed {
|
||||
t.Error("unexpected behavior, r2 should be closed")
|
||||
}
|
||||
|
||||
found = false
|
||||
for ctr = range m.pos {
|
||||
if m.pos[ctr].id == r2.id {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if found {
|
||||
t.Error("unexpected behavior, r2 should be removed from reader position list")
|
||||
}
|
||||
|
||||
_, err = r1.Read(dst1)
|
||||
if err != nil {
|
||||
t.Errorf("failing to read from r1")
|
||||
}
|
||||
if !bytes.Equal(dst1, src) {
|
||||
t.Error("invalid result reading from r1")
|
||||
}
|
||||
|
||||
_ = m.Close()
|
||||
_, err = r3.Read(dst3)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected behavior, cannot read form r3: %v", err)
|
||||
}
|
||||
if !bytes.Equal(src, dst3) {
|
||||
t.Error("invalid result reading from r3")
|
||||
}
|
||||
|
||||
_, err = r3.WaitAvailable()
|
||||
if err == nil {
|
||||
t.Error("unexpected behavior; WaitAvailable when readpos == lastWritePos should fail")
|
||||
}
|
||||
|
||||
_, err = r3.ReadAhead(dst3)
|
||||
if err == nil {
|
||||
t.Error("unexpected behavior; ReadAhead when readpos == lastWritePos should fail")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
//Package multireader provides capability for single writer to multiple reader
|
||||
package multireader
|
||||
|
||||
import (
|
||||
@@ -33,6 +34,7 @@ type Reader struct {
|
||||
closed bool
|
||||
}
|
||||
|
||||
//MultiReader initialize new MultiReader object. The object also implements WriteCloser.
|
||||
func NewMultiReader(len int) *MultiReader {
|
||||
if len < 0 {
|
||||
len = 0
|
||||
@@ -51,6 +53,7 @@ func NewMultiReader(len int) *MultiReader {
|
||||
}
|
||||
}
|
||||
|
||||
// Write implements Writer interface. The function will not return error. Error return only satisfy the interface.
|
||||
func (m *MultiReader) Write(data []byte) (int, error) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
@@ -91,6 +94,32 @@ func (m *MultiReader) shift() {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MultiReader) removeReader(id int) {
|
||||
var (
|
||||
pin int
|
||||
nb []*Reader
|
||||
)
|
||||
for idx := range m.pos {
|
||||
if m.pos[idx].id == id {
|
||||
pin = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
nb = make([]*Reader, len(m.pos)-1)
|
||||
if pin == 0 {
|
||||
copy(nb, m.pos[1:])
|
||||
} else if pin == len(m.pos)-1 {
|
||||
copy(nb, m.pos[:len(m.pos)-1])
|
||||
} else {
|
||||
copy(nb, m.pos[:pin])
|
||||
copy(nb[pin:], m.pos[pin+1:])
|
||||
}
|
||||
|
||||
m.pos = nb
|
||||
}
|
||||
|
||||
// Close will not return error. The error return only satisfy the interface signature.
|
||||
func (m *MultiReader) Close() error {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
@@ -98,6 +127,7 @@ func (m *MultiReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewReader initialize new Reader object. The object implements ReadCloser and Seeker interface.
|
||||
func (m *MultiReader) NewReader() *Reader {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
@@ -116,6 +146,8 @@ func (m *MultiReader) NewReader() *Reader {
|
||||
return reader
|
||||
}
|
||||
|
||||
// Read implements Reader interface. If the reader reached the same position with the writer. It will wait.
|
||||
// Calling Read() is the same with calling WaitAvailable(), ReadAhead(), and Seek()
|
||||
func (r *Reader) Read(data []byte) (int, error) {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
@@ -126,15 +158,9 @@ func (r *Reader) Read(data []byte) (int, error) {
|
||||
err error
|
||||
)
|
||||
|
||||
if r.closed {
|
||||
return 0, ErrReaderClosed
|
||||
}
|
||||
|
||||
for r.readPos >= r.multiReader.lastWritePos {
|
||||
if r.multiReader.closed {
|
||||
return 0, ErrMultiReaderClosed
|
||||
}
|
||||
r.cond.Wait()
|
||||
_, err = r.waitAvailable()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err = r.readAhead(data)
|
||||
@@ -155,6 +181,8 @@ func (r *Reader) seek(offset int, whence int) (int, error) {
|
||||
r.readPos = offset
|
||||
case io.SeekEnd:
|
||||
r.readPos = r.multiReader.lastWritePos - offset
|
||||
default:
|
||||
return 0, ErrUnsupportedWhence
|
||||
}
|
||||
|
||||
r.multiReader.sortLastPos()
|
||||
@@ -163,6 +191,8 @@ func (r *Reader) seek(offset int, whence int) (int, error) {
|
||||
return r.readPos, nil
|
||||
}
|
||||
|
||||
// Seek implements Seeker interface. The underlying function will move the reader position.
|
||||
// It only returns error when the whence parameter is not in io.SeekStart, io.SeekCurrent, io.SeekEnd
|
||||
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
|
||||
n, err := r.seek(int(offset), whence)
|
||||
return int64(n), err
|
||||
@@ -182,15 +212,44 @@ func (r *Reader) readAhead(data []byte) (int, error) {
|
||||
return len(data), nil
|
||||
}
|
||||
|
||||
// ReadAhead reads buffer ahead of current position without moving the reader position.
|
||||
func (r *Reader) ReadAhead(data []byte) (int, error) {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
return r.readAhead(data)
|
||||
}
|
||||
|
||||
func (r *Reader) waitAvailable() (int, error) {
|
||||
if r.closed {
|
||||
return 0, ErrReaderClosed
|
||||
}
|
||||
|
||||
for r.readPos >= r.multiReader.lastWritePos {
|
||||
if r.closed {
|
||||
return 0, ErrReaderClosed
|
||||
}
|
||||
if r.multiReader.closed {
|
||||
return 0, ErrMultiReaderClosed
|
||||
}
|
||||
r.cond.Wait()
|
||||
}
|
||||
|
||||
return r.multiReader.lastWritePos - r.readPos, nil
|
||||
}
|
||||
|
||||
// WaitAvailable will only wait when Reader position reached Writer position.
|
||||
func (r *Reader) WaitAvailable() (int, error) {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
return r.waitAvailable()
|
||||
}
|
||||
|
||||
// Close implements ReadCloser interface. It will not return error.
|
||||
func (r *Reader) Close() error {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
r.closed = true
|
||||
|
||||
r.multiReader.removeReader(r.id)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user