|
|
|
|
@@ -20,6 +20,7 @@ type MultiReader struct {
|
|
|
|
|
buffer []byte
|
|
|
|
|
readers []*Reader
|
|
|
|
|
pos []*Reader
|
|
|
|
|
notif []*Reader
|
|
|
|
|
lastWritePos int
|
|
|
|
|
closed bool
|
|
|
|
|
}
|
|
|
|
|
@@ -28,11 +29,13 @@ type Reader struct {
|
|
|
|
|
multiReader *MultiReader
|
|
|
|
|
mtx *sync.Mutex
|
|
|
|
|
cond *sync.Cond
|
|
|
|
|
notify func(int)
|
|
|
|
|
id int
|
|
|
|
|
readPos int
|
|
|
|
|
closed bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//MultiReader initialize new MultiReader object. The object also implements WriteCloser.
|
|
|
|
|
func NewMultiReader(len int) *MultiReader {
|
|
|
|
|
if len < 0 {
|
|
|
|
|
len = 0
|
|
|
|
|
@@ -46,23 +49,39 @@ func NewMultiReader(len int) *MultiReader {
|
|
|
|
|
buffer: buff,
|
|
|
|
|
readers: make([]*Reader, 0),
|
|
|
|
|
pos: make([]*Reader, 0),
|
|
|
|
|
notif: make([]*Reader, 0),
|
|
|
|
|
lastWritePos: 0,
|
|
|
|
|
closed: false,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MultiReader) notify(dataLen int) {
|
|
|
|
|
m.cond.Broadcast()
|
|
|
|
|
for _, reader := range m.notif {
|
|
|
|
|
if reader.notify != nil && reader.readPos >= m.lastWritePos-dataLen && !reader.closed {
|
|
|
|
|
go func() {
|
|
|
|
|
reader.notify(dataLen)
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write implements Writer interface. The function will not return error. Error return only satisfy the interface.
|
|
|
|
|
func (m *MultiReader) Write(data []byte) (int, error) {
|
|
|
|
|
m.mtx.Lock()
|
|
|
|
|
defer m.mtx.Unlock()
|
|
|
|
|
defer m.cond.Broadcast()
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
tmpBuff []byte
|
|
|
|
|
dataLen int
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if m.closed {
|
|
|
|
|
return 0, ErrMultiReaderClosed
|
|
|
|
|
}
|
|
|
|
|
defer func(ip *int) {
|
|
|
|
|
m.notify(*ip)
|
|
|
|
|
}(&dataLen)
|
|
|
|
|
|
|
|
|
|
if m.lastWritePos+len(data) > len(m.buffer) {
|
|
|
|
|
tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer))
|
|
|
|
|
@@ -70,6 +89,8 @@ func (m *MultiReader) Write(data []byte) (int, error) {
|
|
|
|
|
}
|
|
|
|
|
copy(m.buffer[m.lastWritePos:], data)
|
|
|
|
|
m.lastWritePos += len(data)
|
|
|
|
|
dataLen = len(data)
|
|
|
|
|
//_ = dataLen //prevents lint complaining about ineffectual assignment
|
|
|
|
|
|
|
|
|
|
return len(data), nil
|
|
|
|
|
}
|
|
|
|
|
@@ -116,6 +137,7 @@ func (m *MultiReader) removeReader(id int) {
|
|
|
|
|
m.pos = nb
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close will not return error. The error return only satisfy the interface signature.
|
|
|
|
|
func (m *MultiReader) Close() error {
|
|
|
|
|
m.mtx.Lock()
|
|
|
|
|
defer m.mtx.Unlock()
|
|
|
|
|
@@ -123,6 +145,7 @@ func (m *MultiReader) Close() error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewReader initialize new Reader object. The object implements ReadCloser and Seeker interface.
|
|
|
|
|
func (m *MultiReader) NewReader() *Reader {
|
|
|
|
|
m.mtx.Lock()
|
|
|
|
|
defer m.mtx.Unlock()
|
|
|
|
|
@@ -141,10 +164,44 @@ func (m *MultiReader) NewReader() *Reader {
|
|
|
|
|
return reader
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NotifyFunc sets up a callback function notify. The notify function will be called when there is a Write operation.
|
|
|
|
|
// Notify function passed to the NotifyFunc will executed in a separate go routine for each notify function.
|
|
|
|
|
// It's the caller responsibility to setup any synchronization to prevent data race.
|
|
|
|
|
func (r *Reader) NotifyFunc(notify func(int)) {
|
|
|
|
|
r.mtx.Lock()
|
|
|
|
|
defer r.mtx.Unlock()
|
|
|
|
|
|
|
|
|
|
r.notify = notify
|
|
|
|
|
|
|
|
|
|
var x int
|
|
|
|
|
switch len(r.multiReader.notif) {
|
|
|
|
|
case 0:
|
|
|
|
|
r.multiReader.notif = append(r.multiReader.notif, r)
|
|
|
|
|
case 1:
|
|
|
|
|
if r.multiReader.notif[0].id != r.id {
|
|
|
|
|
goto notifyFuncAppend2
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
goto notifyFuncAppend1
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
notifyFuncAppend1:
|
|
|
|
|
x = sort.Search(len(r.multiReader.notif), func(i int) bool { return r.multiReader.notif[i].id >= r.id })
|
|
|
|
|
if x < len(r.multiReader.notif) && r.id == x {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
notifyFuncAppend2:
|
|
|
|
|
r.multiReader.notif = append(r.multiReader.notif, r)
|
|
|
|
|
sort.Slice(r.multiReader.notif, func(i, j int) bool { return r.multiReader.notif[i].id < r.multiReader.notif[j].id })
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read implements Reader interface. If the reader reached the same position with the writer. It will wait.
|
|
|
|
|
// Calling Read() is the same with calling WaitAvailable(), ReadAhead(), and Seek()
|
|
|
|
|
func (r *Reader) Read(data []byte) (int, error) {
|
|
|
|
|
r.mtx.Lock()
|
|
|
|
|
defer r.mtx.Unlock()
|
|
|
|
|
defer r.cond.Broadcast()
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
n int
|
|
|
|
|
@@ -184,6 +241,8 @@ func (r *Reader) seek(offset int, whence int) (int, error) {
|
|
|
|
|
return r.readPos, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Seek implements Seeker interface. The underlying function will move the reader position.
|
|
|
|
|
// It only returns error when the whence parameter is not in io.SeekStart, io.SeekCurrent, io.SeekEnd
|
|
|
|
|
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
|
|
|
|
|
n, err := r.seek(int(offset), whence)
|
|
|
|
|
return int64(n), err
|
|
|
|
|
@@ -203,6 +262,7 @@ func (r *Reader) readAhead(data []byte) (int, error) {
|
|
|
|
|
return len(data), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ReadAhead reads buffer ahead of current position without moving the reader position.
|
|
|
|
|
func (r *Reader) ReadAhead(data []byte) (int, error) {
|
|
|
|
|
r.mtx.Lock()
|
|
|
|
|
defer r.mtx.Unlock()
|
|
|
|
|
@@ -215,9 +275,6 @@ func (r *Reader) waitAvailable() (int, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for r.readPos >= r.multiReader.lastWritePos {
|
|
|
|
|
if r.closed {
|
|
|
|
|
return 0, ErrReaderClosed
|
|
|
|
|
}
|
|
|
|
|
if r.multiReader.closed {
|
|
|
|
|
return 0, ErrMultiReaderClosed
|
|
|
|
|
}
|
|
|
|
|
@@ -227,12 +284,14 @@ func (r *Reader) waitAvailable() (int, error) {
|
|
|
|
|
return r.multiReader.lastWritePos - r.readPos, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WaitAvailable will only wait when Reader position reached Writer position.
|
|
|
|
|
func (r *Reader) WaitAvailable() (int, error) {
|
|
|
|
|
r.mtx.Lock()
|
|
|
|
|
defer r.mtx.Unlock()
|
|
|
|
|
return r.waitAvailable()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close implements ReadCloser interface. It will not return error.
|
|
|
|
|
func (r *Reader) Close() error {
|
|
|
|
|
r.mtx.Lock()
|
|
|
|
|
defer r.mtx.Unlock()
|
|
|
|
|
|