Merge pull request 'basic functionality and docs' (#1) from dev-docs into master
Reviewed-on: #1
This commit is contained in:
commit
9553f87292
|
@ -0,0 +1,2 @@
|
|||
# MultiReader
|
||||
Single writer with multiple readers. Designed for small number of concurrent readers.
|
|
@ -91,6 +91,31 @@ func (m *MultiReader) shift() {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *MultiReader) removeReader(id int) {
|
||||
var (
|
||||
pin int
|
||||
nb []*Reader
|
||||
)
|
||||
for idx := range m.pos {
|
||||
if m.pos[idx].id == id {
|
||||
pin = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
nb = make([]*Reader, len(m.pos)-1)
|
||||
if pin == 0 {
|
||||
copy(nb, m.pos[1:])
|
||||
} else if pin == len(m.pos)-1 {
|
||||
copy(nb, m.pos[:len(m.pos)-1])
|
||||
} else {
|
||||
copy(nb, m.pos[:pin])
|
||||
copy(nb[pin:], m.pos[pin+1:])
|
||||
}
|
||||
|
||||
m.pos = nb
|
||||
}
|
||||
|
||||
func (m *MultiReader) Close() error {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
@ -126,15 +151,9 @@ func (r *Reader) Read(data []byte) (int, error) {
|
|||
err error
|
||||
)
|
||||
|
||||
if r.closed {
|
||||
return 0, ErrReaderClosed
|
||||
}
|
||||
|
||||
for r.readPos >= r.multiReader.lastWritePos {
|
||||
if r.multiReader.closed {
|
||||
return 0, ErrMultiReaderClosed
|
||||
}
|
||||
r.cond.Wait()
|
||||
_, err = r.waitAvailable()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err = r.readAhead(data)
|
||||
|
@ -155,6 +174,8 @@ func (r *Reader) seek(offset int, whence int) (int, error) {
|
|||
r.readPos = offset
|
||||
case io.SeekEnd:
|
||||
r.readPos = r.multiReader.lastWritePos - offset
|
||||
default:
|
||||
return 0, ErrUnsupportedWhence
|
||||
}
|
||||
|
||||
r.multiReader.sortLastPos()
|
||||
|
@ -188,9 +209,31 @@ func (r *Reader) ReadAhead(data []byte) (int, error) {
|
|||
return r.readAhead(data)
|
||||
}
|
||||
|
||||
func (r *Reader) waitAvailable() (int, error) {
|
||||
for r.readPos >= r.multiReader.lastWritePos {
|
||||
if r.closed {
|
||||
return 0, ErrReaderClosed
|
||||
}
|
||||
if r.multiReader.closed {
|
||||
return 0, ErrMultiReaderClosed
|
||||
}
|
||||
r.cond.Wait()
|
||||
}
|
||||
|
||||
return r.multiReader.lastWritePos - r.readPos, nil
|
||||
}
|
||||
|
||||
func (r *Reader) WaitAvailable() (int, error) {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
return r.waitAvailable()
|
||||
}
|
||||
|
||||
func (r *Reader) Close() error {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
r.closed = true
|
||||
|
||||
r.multiReader.removeReader(r.id)
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue