6 Commits

2 changed files with 141 additions and 11 deletions

View File

@@ -2,6 +2,7 @@ package multireader
import ( import (
"bytes" "bytes"
"io"
"math/rand" "math/rand"
"sync" "sync"
"testing" "testing"
@@ -49,6 +50,7 @@ func TestReadWrite(t *testing.T) {
n int n int
dl int dl int
r int r int
loopFlag bool
) )
defer wg.Done() defer wg.Done()
@@ -60,13 +62,21 @@ func TestReadWrite(t *testing.T) {
r, _ = m.Write(data[n : n+512+dl]) r, _ = m.Write(data[n : n+512+dl])
} }
n += r n += r
if rand.Intn(5) == 0 {
loopFlag = false
for !loopFlag {
if rand.Intn(100) == 0 {
loopFlag = true
}
}
}
} }
_ = m.Close() _ = m.Close()
} }
wg.Add(1) wg.Add(1)
go writeFunc(t, m, src, wg) go writeFunc(t, m, src, wg)
readFunc := func(t *testing.T, r *Reader, data []byte, wg *sync.WaitGroup) { readFunc := func(t *testing.T, r *Reader, data []byte, wg *sync.WaitGroup, split bool) {
var ( var (
offset int offset int
n int n int
@@ -77,11 +87,31 @@ func TestReadWrite(t *testing.T) {
offset = 0 offset = 0
for { for {
if offset+1024 < len(data) { if offset+1024 < len(data) {
if split {
_, err = r.WaitAvailable()
if err != nil {
t.Logf("reader got error on WaitAvailable: %v", err)
break
}
n, err = r.ReadAhead(data[offset : offset+1024])
if err != nil {
t.Logf("reader got error on ReadAhead: %v", err)
break
}
n -= 5
_, err = r.Seek(int64(n), io.SeekCurrent)
if err != nil {
t.Logf("reader got error on Seek: %v", err)
break
}
} else {
n, err = r.Read(data[offset : offset+1024]) n, err = r.Read(data[offset : offset+1024])
if err != nil { if err != nil {
t.Logf("reader got error: %v", err) t.Logf("reader got error: %v", err)
break break
} }
}
} else { } else {
n, err = r.Read(data[offset:]) n, err = r.Read(data[offset:])
if err != nil { if err != nil {
@@ -95,11 +125,11 @@ func TestReadWrite(t *testing.T) {
} }
wg.Add(1) wg.Add(1)
go readFunc(t, r1, dst1, wg) go readFunc(t, r1, dst1, wg, false)
wg.Add(1) wg.Add(1)
go readFunc(t, r2, dst2, wg) go readFunc(t, r2, dst2, wg, false)
wg.Add(1) wg.Add(1)
go readFunc(t, r3, dst3, wg) go readFunc(t, r3, dst3, wg, true)
wg.Wait() wg.Wait()
if !bytes.Equal(src, dst1) { if !bytes.Equal(src, dst1) {
@@ -112,3 +142,87 @@ func TestReadWrite(t *testing.T) {
t.Error("src and dst3 mismatch") t.Error("src and dst3 mismatch")
} }
} }
func TestReaderClose(t *testing.T) {
var (
m *MultiReader
r1 *Reader
r2 *Reader
r3 *Reader
src, dst, dst1, dst3 []byte
err error
found bool
ctr int
)
src = make([]byte, 16)
dst = make([]byte, 32)
dst1 = dst[:16]
dst3 = dst[16:]
m = NewMultiReader(0)
r1 = m.NewReader()
r2 = m.NewReader()
r3 = m.NewReader()
_, _ = rand.Read(src)
_, err = m.Write(src)
if err != nil {
t.Fatalf("error setting up data: %v", err)
}
found = false
for ctr = range m.pos {
if m.pos[ctr].id == r2.id {
found = true
break
}
}
if !found {
t.Error("unexpected behavior, r2 should exist in reader position list")
}
_ = r2.Close()
_, err = r2.Read(dst)
if err == nil && err != ErrReaderClosed {
t.Error("unexpected behavior, r2 should be closed")
}
found = false
for ctr = range m.pos {
if m.pos[ctr].id == r2.id {
found = true
break
}
}
if found {
t.Error("unexpected behavior, r2 should be removed from reader position list")
}
_, err = r1.Read(dst1)
if err != nil {
t.Errorf("failing to read from r1")
}
if !bytes.Equal(dst1, src) {
t.Error("invalid result reading from r1")
}
_ = m.Close()
_, err = r3.Read(dst3)
if err != nil {
t.Errorf("unexpected behavior, cannot read form r3: %v", err)
}
if !bytes.Equal(src, dst3) {
t.Error("invalid result reading from r3")
}
_, err = r3.WaitAvailable()
if err == nil {
t.Error("unexpected behavior; WaitAvailable when readpos == lastWritePos should fail")
}
_, err = r3.ReadAhead(dst3)
if err == nil {
t.Error("unexpected behavior; ReadAhead when readpos == lastWritePos should fail")
}
}

View File

@@ -1,3 +1,4 @@
//Package multireader provides capability for single writer to multiple reader
package multireader package multireader
import ( import (
@@ -33,6 +34,7 @@ type Reader struct {
closed bool closed bool
} }
//MultiReader initialize new MultiReader object. The object also implements WriteCloser.
func NewMultiReader(len int) *MultiReader { func NewMultiReader(len int) *MultiReader {
if len < 0 { if len < 0 {
len = 0 len = 0
@@ -51,6 +53,7 @@ func NewMultiReader(len int) *MultiReader {
} }
} }
// Write implements Writer interface. The function will not return error. Error return only satisfy the interface.
func (m *MultiReader) Write(data []byte) (int, error) { func (m *MultiReader) Write(data []byte) (int, error) {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
@@ -116,6 +119,7 @@ func (m *MultiReader) removeReader(id int) {
m.pos = nb m.pos = nb
} }
// Close will not return error. The error return only satisfy the interface signature.
func (m *MultiReader) Close() error { func (m *MultiReader) Close() error {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
@@ -123,6 +127,7 @@ func (m *MultiReader) Close() error {
return nil return nil
} }
// NewReader initialize new Reader object. The object implements ReadCloser and Seeker interface.
func (m *MultiReader) NewReader() *Reader { func (m *MultiReader) NewReader() *Reader {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
@@ -141,6 +146,8 @@ func (m *MultiReader) NewReader() *Reader {
return reader return reader
} }
// Read implements Reader interface. If the reader reached the same position with the writer. It will wait.
// Calling Read() is the same with calling WaitAvailable(), ReadAhead(), and Seek()
func (r *Reader) Read(data []byte) (int, error) { func (r *Reader) Read(data []byte) (int, error) {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
@@ -184,6 +191,8 @@ func (r *Reader) seek(offset int, whence int) (int, error) {
return r.readPos, nil return r.readPos, nil
} }
// Seek implements Seeker interface. The underlying function will move the reader position.
// It only returns error when the whence parameter is not in io.SeekStart, io.SeekCurrent, io.SeekEnd
func (r *Reader) Seek(offset int64, whence int) (int64, error) { func (r *Reader) Seek(offset int64, whence int) (int64, error) {
n, err := r.seek(int(offset), whence) n, err := r.seek(int(offset), whence)
return int64(n), err return int64(n), err
@@ -203,6 +212,7 @@ func (r *Reader) readAhead(data []byte) (int, error) {
return len(data), nil return len(data), nil
} }
// ReadAhead reads buffer ahead of current position without moving the reader position.
func (r *Reader) ReadAhead(data []byte) (int, error) { func (r *Reader) ReadAhead(data []byte) (int, error) {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
@@ -210,6 +220,10 @@ func (r *Reader) ReadAhead(data []byte) (int, error) {
} }
func (r *Reader) waitAvailable() (int, error) { func (r *Reader) waitAvailable() (int, error) {
if r.closed {
return 0, ErrReaderClosed
}
for r.readPos >= r.multiReader.lastWritePos { for r.readPos >= r.multiReader.lastWritePos {
if r.closed { if r.closed {
return 0, ErrReaderClosed return 0, ErrReaderClosed
@@ -223,12 +237,14 @@ func (r *Reader) waitAvailable() (int, error) {
return r.multiReader.lastWritePos - r.readPos, nil return r.multiReader.lastWritePos - r.readPos, nil
} }
// WaitAvailable will only wait when Reader position reached Writer position.
func (r *Reader) WaitAvailable() (int, error) { func (r *Reader) WaitAvailable() (int, error) {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
return r.waitAvailable() return r.waitAvailable()
} }
// Close implements ReadCloser interface. It will not return error.
func (r *Reader) Close() error { func (r *Reader) Close() error {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()