Compare commits
8 Commits
Author | SHA1 | Date |
---|---|---|
owner | fe32cb94e0 | |
Suyono | 58f30bca72 | |
owner | 098860e3c4 | |
owner | 83175e939e | |
owner | 42ef3ec97e | |
owner | 4f01e91634 | |
Suyono | 08252778ba | |
Suyono | e3ccab83ab |
|
@ -2,9 +2,11 @@ package multireader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMultiRead(t *testing.T) {
|
func TestMultiRead(t *testing.T) {
|
||||||
|
@ -46,9 +48,10 @@ func TestReadWrite(t *testing.T) {
|
||||||
|
|
||||||
writeFunc := func(t *testing.T, m *MultiReader, data []byte, wg *sync.WaitGroup) {
|
writeFunc := func(t *testing.T, m *MultiReader, data []byte, wg *sync.WaitGroup) {
|
||||||
var (
|
var (
|
||||||
n int
|
n int
|
||||||
dl int
|
dl int
|
||||||
r int
|
r int
|
||||||
|
loopFlag bool
|
||||||
)
|
)
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
|
@ -60,13 +63,21 @@ func TestReadWrite(t *testing.T) {
|
||||||
r, _ = m.Write(data[n : n+512+dl])
|
r, _ = m.Write(data[n : n+512+dl])
|
||||||
}
|
}
|
||||||
n += r
|
n += r
|
||||||
|
if rand.Intn(5) == 0 {
|
||||||
|
loopFlag = false
|
||||||
|
for !loopFlag {
|
||||||
|
if rand.Intn(100) == 0 {
|
||||||
|
loopFlag = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ = m.Close()
|
_ = m.Close()
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go writeFunc(t, m, src, wg)
|
go writeFunc(t, m, src, wg)
|
||||||
|
|
||||||
readFunc := func(t *testing.T, r *Reader, data []byte, wg *sync.WaitGroup) {
|
readFunc := func(t *testing.T, r *Reader, data []byte, wg *sync.WaitGroup, split bool) {
|
||||||
var (
|
var (
|
||||||
offset int
|
offset int
|
||||||
n int
|
n int
|
||||||
|
@ -77,10 +88,30 @@ func TestReadWrite(t *testing.T) {
|
||||||
offset = 0
|
offset = 0
|
||||||
for {
|
for {
|
||||||
if offset+1024 < len(data) {
|
if offset+1024 < len(data) {
|
||||||
n, err = r.Read(data[offset : offset+1024])
|
if split {
|
||||||
if err != nil {
|
_, err = r.WaitAvailable()
|
||||||
t.Logf("reader got error: %v", err)
|
if err != nil {
|
||||||
break
|
t.Logf("reader got error on WaitAvailable: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
n, err = r.ReadAhead(data[offset : offset+1024])
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("reader got error on ReadAhead: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
n -= 5
|
||||||
|
_, err = r.Seek(int64(n), io.SeekCurrent)
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("reader got error on Seek: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
n, err = r.Read(data[offset : offset+1024])
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("reader got error: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
n, err = r.Read(data[offset:])
|
n, err = r.Read(data[offset:])
|
||||||
|
@ -95,11 +126,11 @@ func TestReadWrite(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go readFunc(t, r1, dst1, wg)
|
go readFunc(t, r1, dst1, wg, false)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go readFunc(t, r2, dst2, wg)
|
go readFunc(t, r2, dst2, wg, false)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go readFunc(t, r3, dst3, wg)
|
go readFunc(t, r3, dst3, wg, true)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
if !bytes.Equal(src, dst1) {
|
if !bytes.Equal(src, dst1) {
|
||||||
|
@ -112,3 +143,149 @@ func TestReadWrite(t *testing.T) {
|
||||||
t.Error("src and dst3 mismatch")
|
t.Error("src and dst3 mismatch")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReaderClose(t *testing.T) {
|
||||||
|
var (
|
||||||
|
m *MultiReader
|
||||||
|
r1 *Reader
|
||||||
|
r2 *Reader
|
||||||
|
r3 *Reader
|
||||||
|
src, dst, dst1, dst3 []byte
|
||||||
|
err error
|
||||||
|
found bool
|
||||||
|
ctr int
|
||||||
|
)
|
||||||
|
|
||||||
|
src = make([]byte, 16)
|
||||||
|
dst = make([]byte, 32)
|
||||||
|
dst1 = dst[:16]
|
||||||
|
dst3 = dst[16:]
|
||||||
|
m = NewMultiReader(0)
|
||||||
|
r1 = m.NewReader()
|
||||||
|
r2 = m.NewReader()
|
||||||
|
r3 = m.NewReader()
|
||||||
|
|
||||||
|
_, _ = rand.Read(src)
|
||||||
|
_, err = m.Write(src)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error setting up data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
found = false
|
||||||
|
for ctr = range m.pos {
|
||||||
|
if m.pos[ctr].id == r2.id {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Error("unexpected behavior, r2 should exist in reader position list")
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = r2.Close()
|
||||||
|
_, err = r2.Read(dst)
|
||||||
|
if err == nil && err != ErrReaderClosed {
|
||||||
|
t.Error("unexpected behavior, r2 should be closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
found = false
|
||||||
|
for ctr = range m.pos {
|
||||||
|
if m.pos[ctr].id == r2.id {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if found {
|
||||||
|
t.Error("unexpected behavior, r2 should be removed from reader position list")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = r1.Read(dst1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failing to read from r1")
|
||||||
|
}
|
||||||
|
if !bytes.Equal(dst1, src) {
|
||||||
|
t.Error("invalid result reading from r1")
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = m.Close()
|
||||||
|
_, err = r3.Read(dst3)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected behavior, cannot read form r3: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(src, dst3) {
|
||||||
|
t.Error("invalid result reading from r3")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = r3.WaitAvailable()
|
||||||
|
if err == nil {
|
||||||
|
t.Error("unexpected behavior; WaitAvailable when readpos == lastWritePos should fail")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = r3.ReadAhead(dst3)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("unexpected behavior; ReadAhead when readpos == lastWritePos should fail")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNotify(t *testing.T) {
|
||||||
|
src := make([]byte, 16)
|
||||||
|
_, _ = rand.Read(src)
|
||||||
|
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
cr := make(chan int, 3)
|
||||||
|
m := NewMultiReader(0)
|
||||||
|
r := make([]*Reader, 0)
|
||||||
|
r = append(r, m.NewReader(), m.NewReader(), m.NewReader())
|
||||||
|
for i := range r {
|
||||||
|
r[i].NotifyFunc(func(ix int) {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
|
cr <- ix
|
||||||
|
})
|
||||||
|
}
|
||||||
|
_, _ = m.Write(src)
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
nctr := 0
|
||||||
|
stop := false
|
||||||
|
for !stop {
|
||||||
|
select {
|
||||||
|
case rd := <-cr:
|
||||||
|
nctr++
|
||||||
|
if rd != 16 {
|
||||||
|
t.Error("unexpected notify value")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
stop = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if nctr != 3 {
|
||||||
|
t.Error("unexpected number of notify event")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _ = r[0].Seek(16, io.SeekCurrent)
|
||||||
|
_, _ = r[1].Seek(8, io.SeekCurrent)
|
||||||
|
_, _ = r[2].Seek(16, io.SeekCurrent)
|
||||||
|
|
||||||
|
_, _ = m.Write(src[:8])
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
wg.Wait()
|
||||||
|
stop = false
|
||||||
|
nctr = 0
|
||||||
|
for !stop {
|
||||||
|
select {
|
||||||
|
case rd := <-cr:
|
||||||
|
nctr++
|
||||||
|
if rd != 8 {
|
||||||
|
t.Error("unexpected notify value")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
stop = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if nctr != 2 {
|
||||||
|
t.Error("unexpected number of notify event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ type MultiReader struct {
|
||||||
buffer []byte
|
buffer []byte
|
||||||
readers []*Reader
|
readers []*Reader
|
||||||
pos []*Reader
|
pos []*Reader
|
||||||
|
notif []*Reader
|
||||||
lastWritePos int
|
lastWritePos int
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
@ -28,11 +29,13 @@ type Reader struct {
|
||||||
multiReader *MultiReader
|
multiReader *MultiReader
|
||||||
mtx *sync.Mutex
|
mtx *sync.Mutex
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
|
notify func(int)
|
||||||
id int
|
id int
|
||||||
readPos int
|
readPos int
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//MultiReader initialize new MultiReader object. The object also implements WriteCloser.
|
||||||
func NewMultiReader(len int) *MultiReader {
|
func NewMultiReader(len int) *MultiReader {
|
||||||
if len < 0 {
|
if len < 0 {
|
||||||
len = 0
|
len = 0
|
||||||
|
@ -46,23 +49,39 @@ func NewMultiReader(len int) *MultiReader {
|
||||||
buffer: buff,
|
buffer: buff,
|
||||||
readers: make([]*Reader, 0),
|
readers: make([]*Reader, 0),
|
||||||
pos: make([]*Reader, 0),
|
pos: make([]*Reader, 0),
|
||||||
|
notif: make([]*Reader, 0),
|
||||||
lastWritePos: 0,
|
lastWritePos: 0,
|
||||||
closed: false,
|
closed: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MultiReader) notify(dataLen int) {
|
||||||
|
m.cond.Broadcast()
|
||||||
|
for _, reader := range m.notif {
|
||||||
|
if reader.notify != nil && reader.readPos >= m.lastWritePos-dataLen && !reader.closed {
|
||||||
|
go func() {
|
||||||
|
reader.notify(dataLen)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write implements Writer interface. The function will not return error. Error return only satisfy the interface.
|
||||||
func (m *MultiReader) Write(data []byte) (int, error) {
|
func (m *MultiReader) Write(data []byte) (int, error) {
|
||||||
m.mtx.Lock()
|
m.mtx.Lock()
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
defer m.cond.Broadcast()
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
tmpBuff []byte
|
tmpBuff []byte
|
||||||
|
dataLen int
|
||||||
)
|
)
|
||||||
|
|
||||||
if m.closed {
|
if m.closed {
|
||||||
return 0, ErrMultiReaderClosed
|
return 0, ErrMultiReaderClosed
|
||||||
}
|
}
|
||||||
|
defer func(ip *int) {
|
||||||
|
m.notify(*ip)
|
||||||
|
}(&dataLen)
|
||||||
|
|
||||||
if m.lastWritePos+len(data) > len(m.buffer) {
|
if m.lastWritePos+len(data) > len(m.buffer) {
|
||||||
tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer))
|
tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer))
|
||||||
|
@ -70,6 +89,8 @@ func (m *MultiReader) Write(data []byte) (int, error) {
|
||||||
}
|
}
|
||||||
copy(m.buffer[m.lastWritePos:], data)
|
copy(m.buffer[m.lastWritePos:], data)
|
||||||
m.lastWritePos += len(data)
|
m.lastWritePos += len(data)
|
||||||
|
dataLen = len(data)
|
||||||
|
//_ = dataLen //prevents lint complaining about ineffectual assignment
|
||||||
|
|
||||||
return len(data), nil
|
return len(data), nil
|
||||||
}
|
}
|
||||||
|
@ -116,6 +137,7 @@ func (m *MultiReader) removeReader(id int) {
|
||||||
m.pos = nb
|
m.pos = nb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close will not return error. The error return only satisfy the interface signature.
|
||||||
func (m *MultiReader) Close() error {
|
func (m *MultiReader) Close() error {
|
||||||
m.mtx.Lock()
|
m.mtx.Lock()
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
|
@ -123,6 +145,7 @@ func (m *MultiReader) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewReader initialize new Reader object. The object implements ReadCloser and Seeker interface.
|
||||||
func (m *MultiReader) NewReader() *Reader {
|
func (m *MultiReader) NewReader() *Reader {
|
||||||
m.mtx.Lock()
|
m.mtx.Lock()
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
|
@ -141,10 +164,44 @@ func (m *MultiReader) NewReader() *Reader {
|
||||||
return reader
|
return reader
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NotifyFunc sets up a callback function notify. The notify function will be called when there is a Write operation.
|
||||||
|
// Notify function passed to the NotifyFunc will executed in a separate go routine for each notify function.
|
||||||
|
// It's the caller responsibility to setup any synchronization to prevent data race.
|
||||||
|
func (r *Reader) NotifyFunc(notify func(int)) {
|
||||||
|
r.mtx.Lock()
|
||||||
|
defer r.mtx.Unlock()
|
||||||
|
|
||||||
|
r.notify = notify
|
||||||
|
|
||||||
|
var x int
|
||||||
|
switch len(r.multiReader.notif) {
|
||||||
|
case 0:
|
||||||
|
r.multiReader.notif = append(r.multiReader.notif, r)
|
||||||
|
case 1:
|
||||||
|
if r.multiReader.notif[0].id != r.id {
|
||||||
|
goto notifyFuncAppend2
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
goto notifyFuncAppend1
|
||||||
|
}
|
||||||
|
return
|
||||||
|
|
||||||
|
notifyFuncAppend1:
|
||||||
|
x = sort.Search(len(r.multiReader.notif), func(i int) bool { return r.multiReader.notif[i].id >= r.id })
|
||||||
|
if x < len(r.multiReader.notif) && r.id == x {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
notifyFuncAppend2:
|
||||||
|
r.multiReader.notif = append(r.multiReader.notif, r)
|
||||||
|
sort.Slice(r.multiReader.notif, func(i, j int) bool { return r.multiReader.notif[i].id < r.multiReader.notif[j].id })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read implements Reader interface. If the reader reached the same position with the writer. It will wait.
|
||||||
|
// Calling Read() is the same with calling WaitAvailable(), ReadAhead(), and Seek()
|
||||||
func (r *Reader) Read(data []byte) (int, error) {
|
func (r *Reader) Read(data []byte) (int, error) {
|
||||||
r.mtx.Lock()
|
r.mtx.Lock()
|
||||||
defer r.mtx.Unlock()
|
defer r.mtx.Unlock()
|
||||||
defer r.cond.Broadcast()
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
n int
|
n int
|
||||||
|
@ -184,6 +241,8 @@ func (r *Reader) seek(offset int, whence int) (int, error) {
|
||||||
return r.readPos, nil
|
return r.readPos, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Seek implements Seeker interface. The underlying function will move the reader position.
|
||||||
|
// It only returns error when the whence parameter is not in io.SeekStart, io.SeekCurrent, io.SeekEnd
|
||||||
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
|
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
|
||||||
n, err := r.seek(int(offset), whence)
|
n, err := r.seek(int(offset), whence)
|
||||||
return int64(n), err
|
return int64(n), err
|
||||||
|
@ -203,6 +262,7 @@ func (r *Reader) readAhead(data []byte) (int, error) {
|
||||||
return len(data), nil
|
return len(data), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadAhead reads buffer ahead of current position without moving the reader position.
|
||||||
func (r *Reader) ReadAhead(data []byte) (int, error) {
|
func (r *Reader) ReadAhead(data []byte) (int, error) {
|
||||||
r.mtx.Lock()
|
r.mtx.Lock()
|
||||||
defer r.mtx.Unlock()
|
defer r.mtx.Unlock()
|
||||||
|
@ -210,10 +270,11 @@ func (r *Reader) ReadAhead(data []byte) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reader) waitAvailable() (int, error) {
|
func (r *Reader) waitAvailable() (int, error) {
|
||||||
|
if r.closed {
|
||||||
|
return 0, ErrReaderClosed
|
||||||
|
}
|
||||||
|
|
||||||
for r.readPos >= r.multiReader.lastWritePos {
|
for r.readPos >= r.multiReader.lastWritePos {
|
||||||
if r.closed {
|
|
||||||
return 0, ErrReaderClosed
|
|
||||||
}
|
|
||||||
if r.multiReader.closed {
|
if r.multiReader.closed {
|
||||||
return 0, ErrMultiReaderClosed
|
return 0, ErrMultiReaderClosed
|
||||||
}
|
}
|
||||||
|
@ -223,12 +284,14 @@ func (r *Reader) waitAvailable() (int, error) {
|
||||||
return r.multiReader.lastWritePos - r.readPos, nil
|
return r.multiReader.lastWritePos - r.readPos, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitAvailable will only wait when Reader position reached Writer position.
|
||||||
func (r *Reader) WaitAvailable() (int, error) {
|
func (r *Reader) WaitAvailable() (int, error) {
|
||||||
r.mtx.Lock()
|
r.mtx.Lock()
|
||||||
defer r.mtx.Unlock()
|
defer r.mtx.Unlock()
|
||||||
return r.waitAvailable()
|
return r.waitAvailable()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close implements ReadCloser interface. It will not return error.
|
||||||
func (r *Reader) Close() error {
|
func (r *Reader) Close() error {
|
||||||
r.mtx.Lock()
|
r.mtx.Lock()
|
||||||
defer r.mtx.Unlock()
|
defer r.mtx.Unlock()
|
||||||
|
|
Loading…
Reference in New Issue