Compare commits
No commits in common. "master" and "dev-docs" have entirely different histories.
@ -6,7 +6,6 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMultiRead(t *testing.T) {
|
func TestMultiRead(t *testing.T) {
|
||||||
@ -227,65 +226,3 @@ func TestReaderClose(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNotify(t *testing.T) {
|
|
||||||
src := make([]byte, 16)
|
|
||||||
_, _ = rand.Read(src)
|
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
cr := make(chan int, 3)
|
|
||||||
m := NewMultiReader(0)
|
|
||||||
r := make([]*Reader, 0)
|
|
||||||
r = append(r, m.NewReader(), m.NewReader(), m.NewReader())
|
|
||||||
for i := range r {
|
|
||||||
r[i].NotifyFunc(func(ix int) {
|
|
||||||
wg.Add(1)
|
|
||||||
defer wg.Done()
|
|
||||||
cr <- ix
|
|
||||||
})
|
|
||||||
}
|
|
||||||
_, _ = m.Write(src)
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
nctr := 0
|
|
||||||
stop := false
|
|
||||||
for !stop {
|
|
||||||
select {
|
|
||||||
case rd := <-cr:
|
|
||||||
nctr++
|
|
||||||
if rd != 16 {
|
|
||||||
t.Error("unexpected notify value")
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
stop = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if nctr != 3 {
|
|
||||||
t.Error("unexpected number of notify event")
|
|
||||||
}
|
|
||||||
|
|
||||||
_, _ = r[0].Seek(16, io.SeekCurrent)
|
|
||||||
_, _ = r[1].Seek(8, io.SeekCurrent)
|
|
||||||
_, _ = r[2].Seek(16, io.SeekCurrent)
|
|
||||||
|
|
||||||
_, _ = m.Write(src[:8])
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
wg.Wait()
|
|
||||||
stop = false
|
|
||||||
nctr = 0
|
|
||||||
for !stop {
|
|
||||||
select {
|
|
||||||
case rd := <-cr:
|
|
||||||
nctr++
|
|
||||||
if rd != 8 {
|
|
||||||
t.Error("unexpected notify value")
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
stop = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if nctr != 2 {
|
|
||||||
t.Error("unexpected number of notify event")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
//Package multireader provides capability for single writer to multiple reader
|
||||||
package multireader
|
package multireader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -20,7 +21,6 @@ type MultiReader struct {
|
|||||||
buffer []byte
|
buffer []byte
|
||||||
readers []*Reader
|
readers []*Reader
|
||||||
pos []*Reader
|
pos []*Reader
|
||||||
notif []*Reader
|
|
||||||
lastWritePos int
|
lastWritePos int
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
@ -29,7 +29,6 @@ type Reader struct {
|
|||||||
multiReader *MultiReader
|
multiReader *MultiReader
|
||||||
mtx *sync.Mutex
|
mtx *sync.Mutex
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
notify func(int)
|
|
||||||
id int
|
id int
|
||||||
readPos int
|
readPos int
|
||||||
closed bool
|
closed bool
|
||||||
@ -49,39 +48,24 @@ func NewMultiReader(len int) *MultiReader {
|
|||||||
buffer: buff,
|
buffer: buff,
|
||||||
readers: make([]*Reader, 0),
|
readers: make([]*Reader, 0),
|
||||||
pos: make([]*Reader, 0),
|
pos: make([]*Reader, 0),
|
||||||
notif: make([]*Reader, 0),
|
|
||||||
lastWritePos: 0,
|
lastWritePos: 0,
|
||||||
closed: false,
|
closed: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MultiReader) notify(dataLen int) {
|
|
||||||
m.cond.Broadcast()
|
|
||||||
for _, reader := range m.notif {
|
|
||||||
if reader.notify != nil && reader.readPos >= m.lastWritePos-dataLen && !reader.closed {
|
|
||||||
go func() {
|
|
||||||
reader.notify(dataLen)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write implements Writer interface. The function will not return error. Error return only satisfy the interface.
|
// Write implements Writer interface. The function will not return error. Error return only satisfy the interface.
|
||||||
func (m *MultiReader) Write(data []byte) (int, error) {
|
func (m *MultiReader) Write(data []byte) (int, error) {
|
||||||
m.mtx.Lock()
|
m.mtx.Lock()
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
|
defer m.cond.Broadcast()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
tmpBuff []byte
|
tmpBuff []byte
|
||||||
dataLen int
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if m.closed {
|
if m.closed {
|
||||||
return 0, ErrMultiReaderClosed
|
return 0, ErrMultiReaderClosed
|
||||||
}
|
}
|
||||||
defer func(ip *int) {
|
|
||||||
m.notify(*ip)
|
|
||||||
}(&dataLen)
|
|
||||||
|
|
||||||
if m.lastWritePos+len(data) > len(m.buffer) {
|
if m.lastWritePos+len(data) > len(m.buffer) {
|
||||||
tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer))
|
tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer))
|
||||||
@ -89,8 +73,6 @@ func (m *MultiReader) Write(data []byte) (int, error) {
|
|||||||
}
|
}
|
||||||
copy(m.buffer[m.lastWritePos:], data)
|
copy(m.buffer[m.lastWritePos:], data)
|
||||||
m.lastWritePos += len(data)
|
m.lastWritePos += len(data)
|
||||||
dataLen = len(data)
|
|
||||||
//_ = dataLen //prevents lint complaining about ineffectual assignment
|
|
||||||
|
|
||||||
return len(data), nil
|
return len(data), nil
|
||||||
}
|
}
|
||||||
@ -164,44 +146,12 @@ func (m *MultiReader) NewReader() *Reader {
|
|||||||
return reader
|
return reader
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotifyFunc sets up a callback function notify. The notify function will be called when there is a Write operation.
|
|
||||||
// Notify function passed to the NotifyFunc will executed in a separate go routine for each notify function.
|
|
||||||
// It's the caller responsibility to setup any synchronization to prevent data race.
|
|
||||||
func (r *Reader) NotifyFunc(notify func(int)) {
|
|
||||||
r.mtx.Lock()
|
|
||||||
defer r.mtx.Unlock()
|
|
||||||
|
|
||||||
r.notify = notify
|
|
||||||
|
|
||||||
var x int
|
|
||||||
switch len(r.multiReader.notif) {
|
|
||||||
case 0:
|
|
||||||
r.multiReader.notif = append(r.multiReader.notif, r)
|
|
||||||
case 1:
|
|
||||||
if r.multiReader.notif[0].id != r.id {
|
|
||||||
goto notifyFuncAppend2
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
goto notifyFuncAppend1
|
|
||||||
}
|
|
||||||
return
|
|
||||||
|
|
||||||
notifyFuncAppend1:
|
|
||||||
x = sort.Search(len(r.multiReader.notif), func(i int) bool { return r.multiReader.notif[i].id >= r.id })
|
|
||||||
if x < len(r.multiReader.notif) && r.id == x {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
notifyFuncAppend2:
|
|
||||||
r.multiReader.notif = append(r.multiReader.notif, r)
|
|
||||||
sort.Slice(r.multiReader.notif, func(i, j int) bool { return r.multiReader.notif[i].id < r.multiReader.notif[j].id })
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read implements Reader interface. If the reader reached the same position with the writer. It will wait.
|
// Read implements Reader interface. If the reader reached the same position with the writer. It will wait.
|
||||||
// Calling Read() is the same with calling WaitAvailable(), ReadAhead(), and Seek()
|
// Calling Read() is the same with calling WaitAvailable(), ReadAhead(), and Seek()
|
||||||
func (r *Reader) Read(data []byte) (int, error) {
|
func (r *Reader) Read(data []byte) (int, error) {
|
||||||
r.mtx.Lock()
|
r.mtx.Lock()
|
||||||
defer r.mtx.Unlock()
|
defer r.mtx.Unlock()
|
||||||
|
defer r.cond.Broadcast()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
n int
|
n int
|
||||||
@ -275,6 +225,9 @@ func (r *Reader) waitAvailable() (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for r.readPos >= r.multiReader.lastWritePos {
|
for r.readPos >= r.multiReader.lastWritePos {
|
||||||
|
if r.closed {
|
||||||
|
return 0, ErrReaderClosed
|
||||||
|
}
|
||||||
if r.multiReader.closed {
|
if r.multiReader.closed {
|
||||||
return 0, ErrMultiReaderClosed
|
return 0, ErrMultiReaderClosed
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user