Compare commits

...

5 Commits

2 changed files with 127 additions and 5 deletions

View File

@ -6,6 +6,7 @@ import (
"math/rand" "math/rand"
"sync" "sync"
"testing" "testing"
"time"
) )
func TestMultiRead(t *testing.T) { func TestMultiRead(t *testing.T) {
@ -226,3 +227,65 @@ func TestReaderClose(t *testing.T) {
} }
} }
func TestNotify(t *testing.T) {
src := make([]byte, 16)
_, _ = rand.Read(src)
wg := &sync.WaitGroup{}
cr := make(chan int, 3)
m := NewMultiReader(0)
r := make([]*Reader, 0)
r = append(r, m.NewReader(), m.NewReader(), m.NewReader())
for i := range r {
r[i].NotifyFunc(func(ix int) {
wg.Add(1)
defer wg.Done()
cr <- ix
})
}
_, _ = m.Write(src)
time.Sleep(time.Millisecond)
wg.Wait()
nctr := 0
stop := false
for !stop {
select {
case rd := <-cr:
nctr++
if rd != 16 {
t.Error("unexpected notify value")
}
default:
stop = true
}
}
if nctr != 3 {
t.Error("unexpected number of notify event")
}
_, _ = r[0].Seek(16, io.SeekCurrent)
_, _ = r[1].Seek(8, io.SeekCurrent)
_, _ = r[2].Seek(16, io.SeekCurrent)
_, _ = m.Write(src[:8])
time.Sleep(time.Millisecond)
wg.Wait()
stop = false
nctr = 0
for !stop {
select {
case rd := <-cr:
nctr++
if rd != 8 {
t.Error("unexpected notify value")
}
default:
stop = true
}
}
if nctr != 2 {
t.Error("unexpected number of notify event")
}
}

View File

@ -20,6 +20,7 @@ type MultiReader struct {
buffer []byte buffer []byte
readers []*Reader readers []*Reader
pos []*Reader pos []*Reader
notif []*Reader
lastWritePos int lastWritePos int
closed bool closed bool
} }
@ -28,11 +29,13 @@ type Reader struct {
multiReader *MultiReader multiReader *MultiReader
mtx *sync.Mutex mtx *sync.Mutex
cond *sync.Cond cond *sync.Cond
notify func(int)
id int id int
readPos int readPos int
closed bool closed bool
} }
//MultiReader initialize new MultiReader object. The object also implements WriteCloser.
func NewMultiReader(len int) *MultiReader { func NewMultiReader(len int) *MultiReader {
if len < 0 { if len < 0 {
len = 0 len = 0
@ -46,23 +49,39 @@ func NewMultiReader(len int) *MultiReader {
buffer: buff, buffer: buff,
readers: make([]*Reader, 0), readers: make([]*Reader, 0),
pos: make([]*Reader, 0), pos: make([]*Reader, 0),
notif: make([]*Reader, 0),
lastWritePos: 0, lastWritePos: 0,
closed: false, closed: false,
} }
} }
func (m *MultiReader) notify(dataLen int) {
m.cond.Broadcast()
for _, reader := range m.notif {
if reader.notify != nil && reader.readPos >= m.lastWritePos-dataLen && !reader.closed {
go func() {
reader.notify(dataLen)
}()
}
}
}
// Write implements Writer interface. The function will not return error. Error return only satisfy the interface.
func (m *MultiReader) Write(data []byte) (int, error) { func (m *MultiReader) Write(data []byte) (int, error) {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
defer m.cond.Broadcast()
var ( var (
tmpBuff []byte tmpBuff []byte
dataLen int
) )
if m.closed { if m.closed {
return 0, ErrMultiReaderClosed return 0, ErrMultiReaderClosed
} }
defer func(ip *int) {
m.notify(*ip)
}(&dataLen)
if m.lastWritePos+len(data) > len(m.buffer) { if m.lastWritePos+len(data) > len(m.buffer) {
tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer)) tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer))
@ -70,6 +89,8 @@ func (m *MultiReader) Write(data []byte) (int, error) {
} }
copy(m.buffer[m.lastWritePos:], data) copy(m.buffer[m.lastWritePos:], data)
m.lastWritePos += len(data) m.lastWritePos += len(data)
dataLen = len(data)
//_ = dataLen //prevents lint complaining about ineffectual assignment
return len(data), nil return len(data), nil
} }
@ -116,6 +137,7 @@ func (m *MultiReader) removeReader(id int) {
m.pos = nb m.pos = nb
} }
// Close will not return error. The error return only satisfy the interface signature.
func (m *MultiReader) Close() error { func (m *MultiReader) Close() error {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
@ -123,6 +145,7 @@ func (m *MultiReader) Close() error {
return nil return nil
} }
// NewReader initialize new Reader object. The object implements ReadCloser and Seeker interface.
func (m *MultiReader) NewReader() *Reader { func (m *MultiReader) NewReader() *Reader {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
@ -141,10 +164,44 @@ func (m *MultiReader) NewReader() *Reader {
return reader return reader
} }
// NotifyFunc sets up a callback function notify. The notify function will be called when there is a Write operation.
// Notify function passed to the NotifyFunc will executed in a separate go routine for each notify function.
// It's the caller responsibility to setup any synchronization to prevent data race.
func (r *Reader) NotifyFunc(notify func(int)) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.notify = notify
var x int
switch len(r.multiReader.notif) {
case 0:
r.multiReader.notif = append(r.multiReader.notif, r)
case 1:
if r.multiReader.notif[0].id != r.id {
goto notifyFuncAppend2
}
default:
goto notifyFuncAppend1
}
return
notifyFuncAppend1:
x = sort.Search(len(r.multiReader.notif), func(i int) bool { return r.multiReader.notif[i].id >= r.id })
if x < len(r.multiReader.notif) && r.id == x {
return
}
notifyFuncAppend2:
r.multiReader.notif = append(r.multiReader.notif, r)
sort.Slice(r.multiReader.notif, func(i, j int) bool { return r.multiReader.notif[i].id < r.multiReader.notif[j].id })
}
// Read implements Reader interface. If the reader reached the same position with the writer. It will wait.
// Calling Read() is the same with calling WaitAvailable(), ReadAhead(), and Seek()
func (r *Reader) Read(data []byte) (int, error) { func (r *Reader) Read(data []byte) (int, error) {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
defer r.cond.Broadcast()
var ( var (
n int n int
@ -184,6 +241,8 @@ func (r *Reader) seek(offset int, whence int) (int, error) {
return r.readPos, nil return r.readPos, nil
} }
// Seek implements Seeker interface. The underlying function will move the reader position.
// It only returns error when the whence parameter is not in io.SeekStart, io.SeekCurrent, io.SeekEnd
func (r *Reader) Seek(offset int64, whence int) (int64, error) { func (r *Reader) Seek(offset int64, whence int) (int64, error) {
n, err := r.seek(int(offset), whence) n, err := r.seek(int(offset), whence)
return int64(n), err return int64(n), err
@ -203,6 +262,7 @@ func (r *Reader) readAhead(data []byte) (int, error) {
return len(data), nil return len(data), nil
} }
// ReadAhead reads buffer ahead of current position without moving the reader position.
func (r *Reader) ReadAhead(data []byte) (int, error) { func (r *Reader) ReadAhead(data []byte) (int, error) {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
@ -215,9 +275,6 @@ func (r *Reader) waitAvailable() (int, error) {
} }
for r.readPos >= r.multiReader.lastWritePos { for r.readPos >= r.multiReader.lastWritePos {
if r.closed {
return 0, ErrReaderClosed
}
if r.multiReader.closed { if r.multiReader.closed {
return 0, ErrMultiReaderClosed return 0, ErrMultiReaderClosed
} }
@ -227,12 +284,14 @@ func (r *Reader) waitAvailable() (int, error) {
return r.multiReader.lastWritePos - r.readPos, nil return r.multiReader.lastWritePos - r.readPos, nil
} }
// WaitAvailable will only wait when Reader position reached Writer position.
func (r *Reader) WaitAvailable() (int, error) { func (r *Reader) WaitAvailable() (int, error) {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
return r.waitAvailable() return r.waitAvailable()
} }
// Close implements ReadCloser interface. It will not return error.
func (r *Reader) Close() error { func (r *Reader) Close() error {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()